| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- import socket
- import threading
- import select
- import base64
- import time
- class ProxyTunnel:
- """
- 【轻量级版】管理本地代理隧道
- 不依赖 mitmproxy,使用纯 Socket 实现 TCP 盲转发和 Header 注入。
- 资源占用极低,启动速度快,无子进程僵死风险。
- """
- def __init__(self, upstream_ip, upstream_port, username, password):
- self.upstream_ip = upstream_ip
- self.upstream_port = int(upstream_port)
- self.username = username
- self.password = password
-
- # 预先计算 Proxy-Authorization 头,避免运行时计算
- auth_str = f"{username}:{password}"
- b64_auth = base64.b64encode(auth_str.encode()).decode()
- self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
-
- self.server_socket = None
- self.local_port = 0
- self.running = False
- self.listen_thread = None
- def start(self):
- """启动本地监听,返回 '127.0.0.1:port'"""
- try:
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- # 绑定到随机空闲端口
- self.server_socket.bind(('127.0.0.1', 0))
- self.local_port = self.server_socket.getsockname()[1]
- self.server_socket.listen(100) # 允许一定的并发连接
-
- self.running = True
-
- # 启动后台线程处理连接请求
- self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
- self.listen_thread.start()
-
- # Socket 绑定成功即代表启动成功,无需等待
- return f"127.0.0.1:{self.local_port}"
- except Exception as e:
- self.stop()
- raise RuntimeError(f"Failed to start lightweight tunnel: {e}")
- def stop(self):
- """停止服务"""
- self.running = False
- if self.server_socket:
- try:
- # 关闭 Socket 会触发 accept 抛出 OSError,从而结束 _accept_loop
- self.server_socket.close()
- except Exception:
- pass
- self.server_socket = None
- def _accept_loop(self):
- """循环接收浏览器的连接"""
- while self.running:
- try:
- # 设置超时以便能响应 stop 信号
- if self.server_socket:
- self.server_socket.settimeout(1.0)
- try:
- client_sock, _ = self.server_socket.accept()
- except socket.timeout:
- continue
- except OSError:
- # Socket 关闭时触发
- break
-
- # 为每个连接启动一个线程进行转发
- t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
- t.start()
- else:
- break
- except Exception:
- break
- def _handle_client(self, client_sock):
- """处理单个连接:注入 Header -> 双向转发"""
- upstream_sock = None
- try:
- client_sock.settimeout(30) # 防止半开连接
-
- # 1. 读取浏览器发来的第一个包 (通常是 CONNECT 或 GET)
- first_packet = client_sock.recv(16384)
- if not first_packet:
- client_sock.close()
- return
- # 2. 连接远程代理
- upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- upstream_sock.settimeout(10) # 连接超时
- upstream_sock.connect((self.upstream_ip, self.upstream_port))
-
- # 3. [关键步骤] 在第一个包中注入 Proxy-Authorization
- sep = b'\r\n'
- idx = first_packet.find(sep)
- if idx != -1:
- # 插入 Auth 头
- new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
- else:
- new_packet = first_packet # 异常情况直接透传
- # 4. 发送修改后的包给远程代理
- upstream_sock.sendall(new_packet)
-
- # 5. 进入双向盲转发模式 (Tunneling)
- self._pipe_sockets(client_sock, upstream_sock)
- except Exception:
- pass
- finally:
- # 显式拆分 try-except 以避免语法解析错误
- if client_sock:
- try:
- client_sock.close()
- except Exception:
- pass
- if upstream_sock:
- try:
- upstream_sock.close()
- except Exception:
- pass
- def _pipe_sockets(self, sock1, sock2):
- """高效的双向数据转发"""
- sockets = [sock1, sock2]
- try:
- sock1.setblocking(0)
- sock2.setblocking(0)
- except Exception:
- return
-
- last_activity = time.time()
- IDLE_TIMEOUT = 60 # 60秒无数据传输则断开
-
- while self.running:
- try:
- # 使用 select 监听可读状态
- r, _, x = select.select(sockets, [], sockets, 1.0)
-
- if x:
- break # 发生错误
-
- if not r:
- # 空闲检查
- if time.time() - last_activity > IDLE_TIMEOUT:
- break
- continue
-
- for s in r:
- try:
- data = s.recv(65536) # 64KB buffer
- if not data:
- return # 连接关闭
-
- # 转发给对方
- target = sock2 if s is sock1 else sock1
- target.sendall(data)
- last_activity = time.time()
- except Exception:
- return
-
- except Exception:
- break
- def __del__(self):
- self.stop()
|