proxy_tunnel.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import socket
  2. import threading
  3. import select
  4. import base64
  5. import time
  6. class ProxyTunnel:
  7. """
  8. 【修复优化版】管理本地代理隧道
  9. 1. 启用 TCP_NODELAY 消除握手延迟 (关键修复)
  10. 2. 开启 KeepAlive 防止链路中断
  11. 3. 修复非阻塞模式下 sendall 导致的数据丢失问题
  12. """
  13. def __init__(self, upstream_ip, upstream_port, username, password):
  14. self.upstream_ip = upstream_ip
  15. self.upstream_port = int(upstream_port)
  16. self.username = username
  17. self.password = password
  18. # 预先计算 Proxy-Authorization 头
  19. auth_str = f"{username}:{password}"
  20. b64_auth = base64.b64encode(auth_str.encode()).decode()
  21. self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
  22. self.server_socket = None
  23. self.local_port = 0
  24. self.running = False
  25. self.listen_thread = None
  26. def start(self):
  27. try:
  28. self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  29. self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  30. self.server_socket.bind(('127.0.0.1', 0))
  31. self.local_port = self.server_socket.getsockname()[1]
  32. self.server_socket.listen(128) # 增加连接队列长度
  33. self.running = True
  34. self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
  35. self.listen_thread.start()
  36. return f"127.0.0.1:{self.local_port}"
  37. except Exception as e:
  38. self.stop()
  39. raise RuntimeError(f"Failed to start tunnel: {e}")
  40. def stop(self):
  41. self.running = False
  42. if self.server_socket:
  43. try:
  44. self.server_socket.close()
  45. except Exception:
  46. pass
  47. self.server_socket = None
  48. def _accept_loop(self):
  49. while self.running:
  50. try:
  51. if self.server_socket:
  52. # 使用 select 替代 settimeout,减少 CPU 空转
  53. r, _, _ = select.select([self.server_socket], [], [], 1.0)
  54. if r:
  55. try:
  56. client_sock, _ = self.server_socket.accept()
  57. t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
  58. t.start()
  59. except OSError:
  60. break
  61. except Exception:
  62. continue
  63. def _optimize_socket(self, sock):
  64. """核心优化:设置 Socket 选项"""
  65. try:
  66. # 1. 禁用 Nagle 算法:数据包立即发送,不等待填满缓冲区
  67. # 这是解决 "HttpClient Timeout" 的关键
  68. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  69. # 2. 开启 KeepAlive:防止防火墙切断空闲连接
  70. sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  71. # 3. 增大缓冲区 (Linux/Mac 可选,Windows 一般自动管理)
  72. sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32*1024)
  73. sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32*1024)
  74. except Exception:
  75. pass
  76. def _handle_client(self, client_sock):
  77. upstream_sock = None
  78. try:
  79. self._optimize_socket(client_sock)
  80. client_sock.settimeout(30)
  81. # 1. 读取首包 (32KB 缓冲区)
  82. try:
  83. first_packet = client_sock.recv(32768)
  84. except socket.timeout:
  85. return # 客户端连上但不发数据
  86. if not first_packet:
  87. return
  88. # 2. 连接上游
  89. upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  90. self._optimize_socket(upstream_sock) # 同样优化上游 Socket
  91. upstream_sock.settimeout(10) # 连接超时 10s
  92. upstream_sock.connect((self.upstream_ip, self.upstream_port))
  93. # 连接建立后,将超时设为 None (阻塞模式),交由 select 控制
  94. upstream_sock.settimeout(None)
  95. client_sock.settimeout(None)
  96. # 3. 注入 Header
  97. sep = b'\r\n'
  98. idx = first_packet.find(sep)
  99. if idx != -1:
  100. new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
  101. else:
  102. new_packet = first_packet
  103. # 4. 发送首包 (阻塞式发送,保证数据完整)
  104. upstream_sock.sendall(new_packet)
  105. # 5. 双向转发
  106. self._pipe_sockets(client_sock, upstream_sock)
  107. except Exception:
  108. pass
  109. finally:
  110. self._close_socket(client_sock)
  111. self._close_socket(upstream_sock)
  112. def _pipe_sockets(self, sock1, sock2):
  113. """
  114. 修复后的转发逻辑:
  115. 保持 Socket 为阻塞模式,利用 select 监听可读状态。
  116. """
  117. sockets = [sock1, sock2]
  118. last_activity = time.time()
  119. IDLE_TIMEOUT = 120 # 延长空闲超时
  120. while self.running:
  121. try:
  122. # 监听可读事件
  123. r, _, x = select.select(sockets, [], sockets, 1.0)
  124. if x: break # Socket 异常
  125. if not r:
  126. if time.time() - last_activity > IDLE_TIMEOUT:
  127. break
  128. continue
  129. for s in r:
  130. try:
  131. # 尝试读取
  132. data = s.recv(32768)
  133. except ConnectionResetError:
  134. data = None
  135. if not data:
  136. return # 连接关闭
  137. # 确定发送目标
  138. target = sock2 if s is sock1 else sock1
  139. # 关键修改:使用阻塞式 sendall
  140. # 如果网络卡顿,线程会在这里暂停等待,而不是抛出错误或丢包
  141. try:
  142. target.sendall(data)
  143. except BrokenPipeError:
  144. return
  145. last_activity = time.time()
  146. except Exception:
  147. break
  148. def _close_socket(self, sock):
  149. """优雅关闭 Socket"""
  150. if sock:
  151. try:
  152. # 发送 FIN 包,通知对端数据发送完毕
  153. sock.shutdown(socket.SHUT_RDWR)
  154. except Exception:
  155. pass
  156. try:
  157. sock.close()
  158. except Exception:
  159. pass
  160. def __del__(self):
  161. self.stop()