import socket import threading import select import base64 import time class ProxyTunnel: """ 【修复优化版】管理本地代理隧道 1. 启用 TCP_NODELAY 消除握手延迟 (关键修复) 2. 开启 KeepAlive 防止链路中断 3. 修复非阻塞模式下 sendall 导致的数据丢失问题 """ def __init__(self, upstream_ip, upstream_port, username, password): self.upstream_ip = upstream_ip self.upstream_port = int(upstream_port) self.username = username self.password = password # 预先计算 Proxy-Authorization 头 auth_str = f"{username}:{password}" b64_auth = base64.b64encode(auth_str.encode()).decode() self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n" self.server_socket = None self.local_port = 0 self.running = False self.listen_thread = None def start(self): try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(('127.0.0.1', 0)) self.local_port = self.server_socket.getsockname()[1] self.server_socket.listen(128) # 增加连接队列长度 self.running = True self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True) self.listen_thread.start() return f"127.0.0.1:{self.local_port}" except Exception as e: self.stop() raise RuntimeError(f"Failed to start tunnel: {e}") def stop(self): self.running = False if self.server_socket: try: self.server_socket.close() except Exception: pass self.server_socket = None def _accept_loop(self): while self.running: try: if self.server_socket: # 使用 select 替代 settimeout,减少 CPU 空转 r, _, _ = select.select([self.server_socket], [], [], 1.0) if r: try: client_sock, _ = self.server_socket.accept() t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True) t.start() except OSError: break except Exception: continue def _optimize_socket(self, sock): """核心优化:设置 Socket 选项""" try: # 1. 禁用 Nagle 算法:数据包立即发送,不等待填满缓冲区 # 这是解决 "HttpClient Timeout" 的关键 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 2. 开启 KeepAlive:防止防火墙切断空闲连接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 3. 增大缓冲区 (Linux/Mac 可选,Windows 一般自动管理) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32*1024) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32*1024) except Exception: pass def _handle_client(self, client_sock): upstream_sock = None try: self._optimize_socket(client_sock) client_sock.settimeout(30) # 1. 读取首包 (32KB 缓冲区) try: first_packet = client_sock.recv(32768) except socket.timeout: return # 客户端连上但不发数据 if not first_packet: return # 2. 连接上游 upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._optimize_socket(upstream_sock) # 同样优化上游 Socket upstream_sock.settimeout(10) # 连接超时 10s upstream_sock.connect((self.upstream_ip, self.upstream_port)) # 连接建立后,将超时设为 None (阻塞模式),交由 select 控制 upstream_sock.settimeout(None) client_sock.settimeout(None) # 3. 注入 Header sep = b'\r\n' idx = first_packet.find(sep) if idx != -1: new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:] else: new_packet = first_packet # 4. 发送首包 (阻塞式发送,保证数据完整) upstream_sock.sendall(new_packet) # 5. 双向转发 self._pipe_sockets(client_sock, upstream_sock) except Exception: pass finally: self._close_socket(client_sock) self._close_socket(upstream_sock) def _pipe_sockets(self, sock1, sock2): """ 修复后的转发逻辑: 保持 Socket 为阻塞模式,利用 select 监听可读状态。 """ sockets = [sock1, sock2] last_activity = time.time() IDLE_TIMEOUT = 120 # 延长空闲超时 while self.running: try: # 监听可读事件 r, _, x = select.select(sockets, [], sockets, 1.0) if x: break # Socket 异常 if not r: if time.time() - last_activity > IDLE_TIMEOUT: break continue for s in r: try: # 尝试读取 data = s.recv(32768) except ConnectionResetError: data = None if not data: return # 连接关闭 # 确定发送目标 target = sock2 if s is sock1 else sock1 # 关键修改:使用阻塞式 sendall # 如果网络卡顿,线程会在这里暂停等待,而不是抛出错误或丢包 try: target.sendall(data) except BrokenPipeError: return last_activity = time.time() except Exception: break def _close_socket(self, sock): """优雅关闭 Socket""" if sock: try: # 发送 FIN 包,通知对端数据发送完毕 sock.shutdown(socket.SHUT_RDWR) except Exception: pass try: sock.close() except Exception: pass def __del__(self): self.stop()