| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import socket
- import threading
- import select
- import base64
- import time
- class ProxyTunnel:
- """
- 【修复优化版】管理本地代理隧道
- 1. 启用 TCP_NODELAY 消除握手延迟 (关键修复)
- 2. 开启 KeepAlive 防止链路中断
- 3. 修复非阻塞模式下 sendall 导致的数据丢失问题
- """
- def __init__(self, upstream_ip, upstream_port, username, password):
- self.upstream_ip = upstream_ip
- self.upstream_port = int(upstream_port)
- self.username = username
- self.password = password
-
- # 预先计算 Proxy-Authorization 头
- auth_str = f"{username}:{password}"
- b64_auth = base64.b64encode(auth_str.encode()).decode()
- self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
-
- self.server_socket = None
- self.local_port = 0
- self.running = False
- self.listen_thread = None
- def start(self):
- try:
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- self.server_socket.bind(('127.0.0.1', 0))
- self.local_port = self.server_socket.getsockname()[1]
- self.server_socket.listen(128) # 增加连接队列长度
-
- self.running = True
-
- self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
- self.listen_thread.start()
-
- return f"127.0.0.1:{self.local_port}"
- except Exception as e:
- self.stop()
- raise RuntimeError(f"Failed to start tunnel: {e}")
- def stop(self):
- self.running = False
- if self.server_socket:
- try:
- self.server_socket.close()
- except Exception:
- pass
- self.server_socket = None
- def _accept_loop(self):
- while self.running:
- try:
- if self.server_socket:
- # 使用 select 替代 settimeout,减少 CPU 空转
- r, _, _ = select.select([self.server_socket], [], [], 1.0)
- if r:
- try:
- client_sock, _ = self.server_socket.accept()
- t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
- t.start()
- except OSError:
- break
- except Exception:
- continue
- def _optimize_socket(self, sock):
- """核心优化:设置 Socket 选项"""
- try:
- # 1. 禁用 Nagle 算法:数据包立即发送,不等待填满缓冲区
- # 这是解决 "HttpClient Timeout" 的关键
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- # 2. 开启 KeepAlive:防止防火墙切断空闲连接
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-
- # 3. 增大缓冲区 (Linux/Mac 可选,Windows 一般自动管理)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32*1024)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32*1024)
- except Exception:
- pass
- def _handle_client(self, client_sock):
- upstream_sock = None
- try:
- self._optimize_socket(client_sock)
- client_sock.settimeout(30)
-
- # 1. 读取首包 (32KB 缓冲区)
- try:
- first_packet = client_sock.recv(32768)
- except socket.timeout:
- return # 客户端连上但不发数据
-
- if not first_packet:
- return
- # 2. 连接上游
- upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._optimize_socket(upstream_sock) # 同样优化上游 Socket
-
- upstream_sock.settimeout(10) # 连接超时 10s
- upstream_sock.connect((self.upstream_ip, self.upstream_port))
-
- # 连接建立后,将超时设为 None (阻塞模式),交由 select 控制
- upstream_sock.settimeout(None)
- client_sock.settimeout(None)
-
- # 3. 注入 Header
- sep = b'\r\n'
- idx = first_packet.find(sep)
- if idx != -1:
- new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
- else:
- new_packet = first_packet
- # 4. 发送首包 (阻塞式发送,保证数据完整)
- upstream_sock.sendall(new_packet)
-
- # 5. 双向转发
- self._pipe_sockets(client_sock, upstream_sock)
- except Exception:
- pass
- finally:
- self._close_socket(client_sock)
- self._close_socket(upstream_sock)
- def _pipe_sockets(self, sock1, sock2):
- """
- 修复后的转发逻辑:
- 保持 Socket 为阻塞模式,利用 select 监听可读状态。
- """
- sockets = [sock1, sock2]
- last_activity = time.time()
- IDLE_TIMEOUT = 120 # 延长空闲超时
-
- while self.running:
- try:
- # 监听可读事件
- r, _, x = select.select(sockets, [], sockets, 1.0)
-
- if x: break # Socket 异常
-
- if not r:
- if time.time() - last_activity > IDLE_TIMEOUT:
- break
- continue
-
- for s in r:
- try:
- # 尝试读取
- data = s.recv(32768)
- except ConnectionResetError:
- data = None
-
- if not data:
- return # 连接关闭
-
- # 确定发送目标
- target = sock2 if s is sock1 else sock1
-
- # 关键修改:使用阻塞式 sendall
- # 如果网络卡顿,线程会在这里暂停等待,而不是抛出错误或丢包
- try:
- target.sendall(data)
- except BrokenPipeError:
- return
-
- last_activity = time.time()
-
- except Exception:
- break
- def _close_socket(self, sock):
- """优雅关闭 Socket"""
- if sock:
- try:
- # 发送 FIN 包,通知对端数据发送完毕
- sock.shutdown(socket.SHUT_RDWR)
- except Exception:
- pass
- try:
- sock.close()
- except Exception:
- pass
- def __del__(self):
- self.stop()
|