proxy_tunnel.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import socket
  2. import threading
  3. import select
  4. import base64
  5. import time
  6. class ProxyTunnel:
  7. """
  8. 【轻量级版】管理本地代理隧道
  9. 不依赖 mitmproxy,使用纯 Socket 实现 TCP 盲转发和 Header 注入。
  10. 资源占用极低,启动速度快,无子进程僵死风险。
  11. """
  12. def __init__(self, upstream_ip, upstream_port, username, password):
  13. self.upstream_ip = upstream_ip
  14. self.upstream_port = int(upstream_port)
  15. self.username = username
  16. self.password = password
  17. # 预先计算 Proxy-Authorization 头,避免运行时计算
  18. auth_str = f"{username}:{password}"
  19. b64_auth = base64.b64encode(auth_str.encode()).decode()
  20. self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
  21. self.server_socket = None
  22. self.local_port = 0
  23. self.running = False
  24. self.listen_thread = None
  25. def start(self):
  26. """启动本地监听,返回 '127.0.0.1:port'"""
  27. try:
  28. self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  29. self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  30. # 绑定到随机空闲端口
  31. self.server_socket.bind(('127.0.0.1', 0))
  32. self.local_port = self.server_socket.getsockname()[1]
  33. self.server_socket.listen(100) # 允许一定的并发连接
  34. self.running = True
  35. # 启动后台线程处理连接请求
  36. self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
  37. self.listen_thread.start()
  38. # Socket 绑定成功即代表启动成功,无需等待
  39. return f"127.0.0.1:{self.local_port}"
  40. except Exception as e:
  41. self.stop()
  42. raise RuntimeError(f"Failed to start lightweight tunnel: {e}")
  43. def stop(self):
  44. """停止服务"""
  45. self.running = False
  46. if self.server_socket:
  47. try:
  48. # 关闭 Socket 会触发 accept 抛出 OSError,从而结束 _accept_loop
  49. self.server_socket.close()
  50. except Exception:
  51. pass
  52. self.server_socket = None
  53. def _accept_loop(self):
  54. """循环接收浏览器的连接"""
  55. while self.running:
  56. try:
  57. # 设置超时以便能响应 stop 信号
  58. if self.server_socket:
  59. self.server_socket.settimeout(1.0)
  60. try:
  61. client_sock, _ = self.server_socket.accept()
  62. except socket.timeout:
  63. continue
  64. except OSError:
  65. # Socket 关闭时触发
  66. break
  67. # 为每个连接启动一个线程进行转发
  68. t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
  69. t.start()
  70. else:
  71. break
  72. except Exception:
  73. break
  74. def _handle_client(self, client_sock):
  75. """处理单个连接:注入 Header -> 双向转发"""
  76. upstream_sock = None
  77. try:
  78. client_sock.settimeout(30) # 防止半开连接
  79. # 1. 读取浏览器发来的第一个包 (通常是 CONNECT 或 GET)
  80. first_packet = client_sock.recv(16384)
  81. if not first_packet:
  82. client_sock.close()
  83. return
  84. # 2. 连接远程代理
  85. upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  86. upstream_sock.settimeout(10) # 连接超时
  87. upstream_sock.connect((self.upstream_ip, self.upstream_port))
  88. # 3. [关键步骤] 在第一个包中注入 Proxy-Authorization
  89. sep = b'\r\n'
  90. idx = first_packet.find(sep)
  91. if idx != -1:
  92. # 插入 Auth 头
  93. new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
  94. else:
  95. new_packet = first_packet # 异常情况直接透传
  96. # 4. 发送修改后的包给远程代理
  97. upstream_sock.sendall(new_packet)
  98. # 5. 进入双向盲转发模式 (Tunneling)
  99. self._pipe_sockets(client_sock, upstream_sock)
  100. except Exception:
  101. pass
  102. finally:
  103. # 显式拆分 try-except 以避免语法解析错误
  104. if client_sock:
  105. try:
  106. client_sock.close()
  107. except Exception:
  108. pass
  109. if upstream_sock:
  110. try:
  111. upstream_sock.close()
  112. except Exception:
  113. pass
  114. def _pipe_sockets(self, sock1, sock2):
  115. """高效的双向数据转发"""
  116. sockets = [sock1, sock2]
  117. try:
  118. sock1.setblocking(0)
  119. sock2.setblocking(0)
  120. except Exception:
  121. return
  122. last_activity = time.time()
  123. IDLE_TIMEOUT = 60 # 60秒无数据传输则断开
  124. while self.running:
  125. try:
  126. # 使用 select 监听可读状态
  127. r, _, x = select.select(sockets, [], sockets, 1.0)
  128. if x:
  129. break # 发生错误
  130. if not r:
  131. # 空闲检查
  132. if time.time() - last_activity > IDLE_TIMEOUT:
  133. break
  134. continue
  135. for s in r:
  136. try:
  137. data = s.recv(65536) # 64KB buffer
  138. if not data:
  139. return # 连接关闭
  140. # 转发给对方
  141. target = sock2 if s is sock1 else sock1
  142. target.sendall(data)
  143. last_activity = time.time()
  144. except Exception:
  145. return
  146. except Exception:
  147. break
  148. def __del__(self):
  149. self.stop()