import os import time import socket import subprocess import yaml import tempfile import shutil import urllib.request class MihomoTunnel: """ Manage local proxy tunnels using Mihomo (Clash Meta). Supports multi-instance concurrency with isolated temp directories. Adapted for the new 'dialer-proxy' chain routing mechanism. """ def __init__(self, mihomo_bin_path, exit_node, relay_node=None, enable_log=False): self.mihomo_bin_path = os.path.abspath(mihomo_bin_path) self.exit_node = exit_node self.relay_node = relay_node self.enable_log = enable_log self.process = None self.local_port = 0 self.api_port = 0 self.log_file_obj = None self.temp_dir = tempfile.mkdtemp(prefix="mihomo_tunnel_") self.config_path = os.path.join(self.temp_dir, "config.yaml") self.log_path = os.path.join(self.temp_dir, "run.log") def _get_free_port(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('127.0.0.1', 0)) return s.getsockname()[1] def _generate_config(self): self.local_port = self._get_free_port() self.api_port = self._get_free_port() config = { "allow-lan": False, "bind-address": "127.0.0.1", "log-level": "warning" if self.enable_log else "silent", "mixed-port": self.local_port, "external-controller": f"127.0.0.1:{self.api_port}", "mode": "rule", "ipv6": False, "dns": { "enable": False }, "proxies": [], "rules": [] } # Deep copy to avoid modifying the original dictionary # in case it is reused by other instances. exit_node_config = dict(self.exit_node) # Handle proxy chaining using the new 'dialer-proxy' feature if self.relay_node: # 1. Add relay node to proxies config["proxies"].append(self.relay_node) # 2. Tell the exit node to dial through the relay node exit_node_config["dialer-proxy"] = self.relay_node["name"] # 3. Add exit node to proxies config["proxies"].append(exit_node_config) # 4. Route all traffic to the exit node config["rules"].append(f"MATCH,{exit_node_config['name']}") with open(self.config_path, 'w', encoding='utf-8') as f: yaml.dump(config, f, allow_unicode=False, sort_keys=False) def start(self): if not os.path.exists(self.mihomo_bin_path): raise FileNotFoundError(f"Mihomo binary not found: {self.mihomo_bin_path}") self._generate_config() if self.enable_log: self.log_file_obj = open(self.log_path, 'w', encoding='utf-8') stdout_target = self.log_file_obj else: stdout_target = subprocess.DEVNULL try: self.process = subprocess.Popen( [self.mihomo_bin_path, "-d", self.temp_dir, "-f", self.config_path], stdout=stdout_target, stderr=subprocess.STDOUT, cwd=self.temp_dir ) time.sleep(1.5) if self.process.poll() is not None: error_msg = "Mihomo failed to start and exited immediately." if self.enable_log: self.log_file_obj.close() with open(self.log_path, 'r', encoding='utf-8') as f: error_msg += f"\nLog:\n{f.read()}" raise RuntimeError(error_msg) return f"127.0.0.1:{self.local_port}" except Exception as e: self.stop() raise e def cut_all_connections(self): """ [新增方法] 通过 Mihomo API 强制切断当前代理池的所有活跃连接。 常用于检测到 IP 被封锁或切换代理线路时,立即打断正在卡死/挂起的旧请求。 """ if not self.process or self.process.poll() is not None: return False try: url = f"http://127.0.0.1:{self.api_port}/connections" req = urllib.request.Request(url, method="DELETE") with urllib.request.urlopen(req, timeout=3) as response: return response.status in (200, 204) except Exception as e: if self.enable_log: print(f"[MihomoTunnel] Failed to cut connections: {e}") return False def stop(self): if self.process and self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=3) except subprocess.TimeoutExpired: self.process.kill() self.process = None if self.log_file_obj and not self.log_file_obj.closed: self.log_file_obj.close() if os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir, ignore_errors=True) except Exception: pass def __del__(self): self.stop() # ================= TEST SCRIPT ================= if __name__ == "__main__": MIHOMO_EXE = "E:/coordinator/mihomo-windows-amd64-alpha-98aa7e6/mihomo-windows-amd64.exe" if os.name == 'nt' else "./mihomo" exit_node = { "name": "ExitNode", "type": "http", "server": "46.203.77.84", "port": 41588, "username": "nKX6cH1jvBWJlFZ", "password": "vySUmO3VMCKkYDS" } relay_node = { "name": "RelayNode", "type": "ss", "server": "odko6qmr.mobilfunk.top", "port": 12012, "cipher": "aes-128-gcm", "password": "haMLMXirByn6rGVh", "plugin": "obfs", "plugin-opts": { "mode": "http", "host": "c0f69828a7c1.microsoft.com" }, "udp": True } print("Starting Mihomo Tunnel...") # Enable log to False for production (no disk writing, no console output) tunnel = MihomoTunnel(MIHOMO_EXE, exit_node, relay_node, enable_log=True) try: local_proxy = tunnel.start() print(f"Success! Proxy mapped to: {local_proxy}") while True: time.sleep(10) except KeyboardInterrupt: print("\nStopping tunnel...") except Exception as e: print(f"\nError: {e}") finally: tunnel.stop() print("Tunnel closed and temp files cleaned.")