# toolkit/proxy_manager.py import threading import time import random from typing import List, Optional, Dict, Any from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO # type: ignore class ProxyManager: """模拟代理管理器,用于获取和锁定代理。""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._proxies = { "global_proxy": [ { "id": 100003, "ip": "82.152.15.37", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfs03sh920" }, { "id": 100005, "ip": "82.152.19.246", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfmv4gp9c" }, { "id": 100014, "ip": "82.152.15.33", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfpclja918" }, { "id": 100018, "ip": "82.152.19.4", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfmv4gp914" }, { "id": 100019, "ip": "82.152.15.8", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfmv4gp913" }, { "id": 100020, "ip": "82.152.19.141", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfmv4gp912" }, { "id": 100023, "ip": "82.152.19.2", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scepfmv4gp9e" }, { "id": 100025, "ip": "163.5.40.236", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4scekff9u6j9a" }, { "id": 100026, "ip": "45.196.65.44", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4sj3agscn8592cf" }, { "id": 100027, "ip": "154.82.173.108", "lock_until": 0, "password": "M8v0m9b1p4b3", "port": 9856, "scheme": "http", "username": "l9w3z3c4B4O2" }, { "id": 100028, "ip": "45.196.65.25", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4sio3rdic7m9184" }, { "id": 100029, "ip": "45.196.65.109", "lock_until": 0, "password": "idzlar", "port": 7778, "scheme": "http", "username": "4sibp4qi26n93c" } ], "ireland_proxies": [ { "id": 100029, "ip": "95.135.130.73", "lock_until": 0, "password": "RTBuPWx1CEr6DfD", "port": 48306, "scheme": "http", "username": "9zMOkhCng5HG8SZ" }, { "id": 100029, "ip": "95.135.130.76", "lock_until": 0, "password": "5i4lV3VjNwE4bkL", "port": 41553, "scheme": "http", "username": "aNCaMFjblyODleO" } ] } cls._instance._proxy_lock = threading.Lock() # 用于保护代理状态 return cls._instance @staticmethod def Instance(): return ProxyManager() def get_next_proxy(self, pool_name: str) -> Optional[Dict[str, Any]]: """从指定池中获取下一个可用代理。""" with self._proxy_lock: proxies = self._proxies.get(pool_name, []) now = time.time() available_proxies = [p for p in proxies if p["lock_until"] <= now] if not available_proxies: VSC_WARN("proxy_manager", "No available proxies in pool '%s'", pool_name) return None proxy = random.choice(available_proxies) VSC_DEBUG("proxy_manager", "Selected proxy ID %d from pool '%s'", proxy["id"], pool_name) return proxy def get_unbind_proxy(self, pool_name: str, bounded_ids: List[int]) -> Optional[Dict[str, Any]]: """获取一个未绑定(且未锁定)的代理。""" with self._proxy_lock: proxies = self._proxies.get(pool_name, []) now = time.time() unbound_and_available_proxies = [ p for p in proxies if p["id"] not in bounded_ids and p["lock_until"] <= now ] if not unbound_and_available_proxies: VSC_WARN("proxy_manager", "No unbind and available proxies in pool '%s'", pool_name) return None proxy = random.choice(unbound_and_available_proxies) VSC_DEBUG("proxy_manager", "Selected unbind proxy ID %d from pool '%s'", proxy["id"], pool_name) return proxy def lock_proxy(self, pool_name: str, proxy_id: int, duration_seconds: int): """锁定指定代理一段时间。""" with self._proxy_lock: proxies = self._proxies.get(pool_name, []) for p in proxies: if p["id"] == proxy_id: p["lock_until"] = time.time() + duration_seconds VSC_INFO("proxy_manager", "Locked proxy ID %d in pool '%s' until %s", proxy_id, pool_name, time.ctime(p["lock_until"])) return VSC_WARN("proxy_manager", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)