import threading import time import json import os import random from typing import List, Optional, Dict, Any from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR class ProxyManager: """ 代理管理器 (仅本地配置文件模式) 读取 config/proxies.json """ _instance = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_data() return cls._instance @staticmethod def Instance(): return ProxyManager() def _init_data(self): self._proxies: Dict[str, List[Dict]] = {} # pool_name -> [proxy_dict] self._proxy_lock = threading.RLock() self._config_path = "config/proxies.json" self.reload_config() def reload_config(self): """(重新)加载本地配置文件""" if not os.path.exists(self._config_path): VSC_WARN("proxy_mgr", f"Config file not found: {self._config_path}. Proxy pools are empty.") return try: with open(self._config_path, 'r', encoding='utf-8') as f: data = json.load(f) count = 0 with self._proxy_lock: self._proxies.clear() for pool_name, proxy_list in data.items(): processed_list = [] for p in proxy_list: # 校验必要字段 if "id" not in p or "ip" not in p or "port" not in p: continue # 初始化状态 p.setdefault('lock_until', 0) p.setdefault('scheme', 'http') p.setdefault('username', '') p.setdefault('password', '') processed_list.append(p) count += 1 self._proxies[pool_name] = processed_list VSC_INFO("proxy_mgr", f"Loaded {count} proxies from {self._config_path}") except json.JSONDecodeError: VSC_ERROR("proxy_mgr", f"Invalid JSON format in {self._config_path}") except Exception as e: VSC_ERROR("proxy_mgr", f"Failed to load proxy config: {e}") def get_next_proxy(self, pool_name: str) -> Optional[Dict[str, Any]]: """ 从指定池中获取下一个可用代理 (随机) """ with self._proxy_lock: proxies = self._proxies.get(pool_name, []) if not proxies: VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name) return None now = time.time() available_proxies = [p for p in proxies if p["lock_until"] <= now] if not available_proxies: VSC_WARN("proxy_mgr", "Pool '%s' has proxies but all are locked/busy.", pool_name) return None proxy = random.choice(available_proxies) VSC_DEBUG("proxy_mgr", "Selected proxy ID %d (%s) from pool '%s'", proxy["id"], proxy["ip"], pool_name) return proxy def get_unbind_proxy(self, pool_name: str, bounded_ids: List[int]) -> Optional[Dict[str, Any]]: """ 获取一个未绑定(且未锁定)的代理。 用于 GCO 的 IP 绑定逻辑。 """ with self._proxy_lock: proxies = self._proxies.get(pool_name, []) if not proxies: VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name) return None now = time.time() # 筛选条件:1. 未锁定 2. ID 不在已绑定列表中 unbound_and_available = [ p for p in proxies if p["id"] not in bounded_ids and p["lock_until"] <= now ] if not unbound_and_available: VSC_WARN("proxy_mgr", "No unbound and available proxies in pool '%s'", pool_name) return None proxy = random.choice(unbound_and_available) VSC_DEBUG("proxy_mgr", "Selected unbound proxy ID %d from pool '%s'", proxy["id"], pool_name) return proxy def lock_proxy(self, pool_name: str, proxy_id: int, duration_seconds: int): """ 锁定指定代理一段时间 (例如请求过于频繁被 429) """ with self._proxy_lock: proxies = self._proxies.get(pool_name, []) for p in proxies: if p["id"] == proxy_id: p["lock_until"] = time.time() + duration_seconds VSC_INFO("proxy_mgr", "Locked proxy ID %d in pool '%s' for %ds", proxy_id, pool_name, duration_seconds) return VSC_WARN("proxy_mgr", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)