import socket import threading import select import base64 import time class ProxyTunnel: """ 【轻量级版】管理本地代理隧道 不依赖 mitmproxy,使用纯 Socket 实现 TCP 盲转发和 Header 注入。 资源占用极低,启动速度快,无子进程僵死风险。 """ def __init__(self, upstream_ip, upstream_port, username, password): self.upstream_ip = upstream_ip self.upstream_port = int(upstream_port) self.username = username self.password = password # 预先计算 Proxy-Authorization 头,避免运行时计算 auth_str = f"{username}:{password}" b64_auth = base64.b64encode(auth_str.encode()).decode() self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n" self.server_socket = None self.local_port = 0 self.running = False self.listen_thread = None def start(self): """启动本地监听,返回 '127.0.0.1:port'""" try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定到随机空闲端口 self.server_socket.bind(('127.0.0.1', 0)) self.local_port = self.server_socket.getsockname()[1] self.server_socket.listen(100) # 允许一定的并发连接 self.running = True # 启动后台线程处理连接请求 self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True) self.listen_thread.start() # Socket 绑定成功即代表启动成功,无需等待 return f"127.0.0.1:{self.local_port}" except Exception as e: self.stop() raise RuntimeError(f"Failed to start lightweight tunnel: {e}") def stop(self): """停止服务""" self.running = False if self.server_socket: try: # 关闭 Socket 会触发 accept 抛出 OSError,从而结束 _accept_loop self.server_socket.close() except Exception: pass self.server_socket = None def _accept_loop(self): """循环接收浏览器的连接""" while self.running: try: # 设置超时以便能响应 stop 信号 if self.server_socket: self.server_socket.settimeout(1.0) try: client_sock, _ = self.server_socket.accept() except socket.timeout: continue except OSError: # Socket 关闭时触发 break # 为每个连接启动一个线程进行转发 t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True) t.start() else: break except Exception: break def _handle_client(self, client_sock): """处理单个连接:注入 Header -> 双向转发""" upstream_sock = None try: client_sock.settimeout(30) # 防止半开连接 # 1. 读取浏览器发来的第一个包 (通常是 CONNECT 或 GET) first_packet = client_sock.recv(16384) if not first_packet: client_sock.close() return # 2. 连接远程代理 upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) upstream_sock.settimeout(10) # 连接超时 upstream_sock.connect((self.upstream_ip, self.upstream_port)) # 3. [关键步骤] 在第一个包中注入 Proxy-Authorization sep = b'\r\n' idx = first_packet.find(sep) if idx != -1: # 插入 Auth 头 new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:] else: new_packet = first_packet # 异常情况直接透传 # 4. 发送修改后的包给远程代理 upstream_sock.sendall(new_packet) # 5. 进入双向盲转发模式 (Tunneling) self._pipe_sockets(client_sock, upstream_sock) except Exception: pass finally: # 显式拆分 try-except 以避免语法解析错误 if client_sock: try: client_sock.close() except Exception: pass if upstream_sock: try: upstream_sock.close() except Exception: pass def _pipe_sockets(self, sock1, sock2): """高效的双向数据转发""" sockets = [sock1, sock2] try: sock1.setblocking(0) sock2.setblocking(0) except Exception: return last_activity = time.time() IDLE_TIMEOUT = 60 # 60秒无数据传输则断开 while self.running: try: # 使用 select 监听可读状态 r, _, x = select.select(sockets, [], sockets, 1.0) if x: break # 发生错误 if not r: # 空闲检查 if time.time() - last_activity > IDLE_TIMEOUT: break continue for s in r: try: data = s.recv(65536) # 64KB buffer if not data: return # 连接关闭 # 转发给对方 target = sock2 if s is sock1 else sock1 target.sendall(data) last_activity = time.time() except Exception: return except Exception: break def __del__(self): self.stop()