import threading import time import json import os import random from typing import List, Optional, Dict, Any from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR class ProxyManager: """ 代理管理器 (支持数据落盘持久化 + 代理池隔离) - 静态配置: config/proxies.json - 动态状态: data/proxy_states.json """ _instance = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_data() return cls._instance @staticmethod def Instance(): return ProxyManager() def _init_data(self): # 静态数据: pool_name -> List[Dict] self._proxies: Dict[str, List[Dict]] = {} # 动态锁定状态: "pool_name::proxy_id" -> lock_until_timestamp self._lock_states: Dict[str, float] = {} self._proxy_lock = threading.RLock() # 路径配置 self._config_path = "config/proxies.json" self._state_path = "data/proxy_states.json" # 确保 data 目录存在 os.makedirs(os.path.dirname(self._state_path), exist_ok=True) self.reload_config() def _get_state_key(self, pool_name: str, proxy_id: Any) -> str: """ 生成唯一的锁定状态 Key (组合键) 格式: pool_name::proxy_id """ return f"{pool_name}::{proxy_id}" def reload_config(self): """(重新)加载本地配置文件,并恢复锁定状态""" if not os.path.exists(self._config_path): VSC_WARN("proxy_mgr", f"Config file not found: {self._config_path}. Proxy pools are empty.") return try: # 1. 先加载磁盘上的锁定状态 self._load_state_from_disk() # 2. 加载静态配置 with open(self._config_path, 'r', encoding='utf-8') as f: data = json.load(f) count = 0 with self._proxy_lock: self._proxies.clear() for pool_name, proxy_list in data.items(): processed_list = [] for p in proxy_list: # 校验必要字段 if "id" not in p or "ip" not in p or "port" not in p: continue # 初始化默认字段 p.setdefault('scheme', 'http') p.setdefault('username', '') p.setdefault('password', '') # 恢复锁定状态 proxy_id = p["id"] state_key = self._get_state_key(pool_name, proxy_id) # 从全局状态中获取该池子下该代理的锁定时间 saved_lock = self._lock_states.get(state_key, 0) p['lock_until'] = saved_lock processed_list.append(p) count += 1 self._proxies[pool_name] = processed_list VSC_INFO("proxy_mgr", f"Loaded {count} proxies. Lock states restored from {self._state_path}") except json.JSONDecodeError: VSC_ERROR("proxy_mgr", f"Invalid JSON format in {self._config_path}") except Exception as e: VSC_ERROR("proxy_mgr", f"Failed to load proxy config: {e}") def _load_state_from_disk(self): """从磁盘读取状态文件""" with self._proxy_lock: self._lock_states.clear() if not os.path.exists(self._state_path): return try: with open(self._state_path, 'r', encoding='utf-8') as f: data = json.load(f) for k, v in data.items(): self._lock_states[str(k)] = float(v) except Exception as e: VSC_WARN("proxy_mgr", f"Failed to load state file: {e}. Starting fresh.") def _save_state_to_disk(self): """ 将当前内存中的锁定状态写入磁盘 只保存未来还会被锁定的记录,减少文件大小 """ try: now = time.time() active_states = { k: v for k, v in self._lock_states.items() if v > now } # 更新内存缓存 self._lock_states = active_states with open(self._state_path, 'w', encoding='utf-8') as f: json.dump(active_states, f, indent=4) except Exception as e: VSC_ERROR("proxy_mgr", f"Failed to save state to disk: {e}") def next(self, pool_name: str, lock_duration: float = 0.0) -> Optional[Dict[str, Any]]: """ 从指定池中获取下一个可用代理,并锁定指定时长。 @param pool_name: 代理池名称 @param lock_duration: 锁定时间(秒)。 注意:如果不传或传0,则不锁定(仅获取),适合无状态请求。 如果为了限制频率,请传入大于0的值。 """ with self._proxy_lock: proxies = self._proxies.get(pool_name, []) if not proxies: # VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name) return None now = time.time() available = [] # 筛选可用代理 for p in proxies: proxy_id = p["id"] state_key = self._get_state_key(pool_name, proxy_id) # 检查隔离的锁定状态 current_lock = self._lock_states.get(state_key, 0) if current_lock <= now: available.append(p) if not available: # VSC_WARN("proxy_mgr", "Pool '%s' has proxies but all are locked/busy.", pool_name) return None # 随机选择 selected = random.choice(available) proxy_id = selected["id"] # === 如果需要锁定,更新全局状态并落盘 === if lock_duration > 0: new_lock_until = now + lock_duration state_key = self._get_state_key(pool_name, proxy_id) # 1. 更新状态 self._lock_states[state_key] = new_lock_until selected["lock_until"] = new_lock_until # 2. 立即落盘 self._save_state_to_disk() VSC_DEBUG("proxy_mgr", "Selected proxy ID %s (Pool: %s), locked for %.0fs", proxy_id, pool_name, lock_duration) else: VSC_DEBUG("proxy_mgr", "Selected proxy ID %s (Pool: %s) without lock", proxy_id, pool_name) return selected.copy() def lock(self, pool_name: str, proxy_id: int, duration_seconds: int): """ 手动锁定指定代理 (支持数据落盘) 通常用于在请求失败(如429)时紧急调用 """ with self._proxy_lock: proxies = self._proxies.get(pool_name, []) target_proxy = None for p in proxies: if p["id"] == proxy_id: target_proxy = p break if target_proxy: state_key = self._get_state_key(pool_name, proxy_id) new_lock_until = time.time() + duration_seconds # 更新状态 self._lock_states[state_key] = new_lock_until target_proxy["lock_until"] = new_lock_until # 立即落盘 self._save_state_to_disk() VSC_INFO("proxy_mgr", "Manually locked proxy ID %d in pool '%s' for %ds", proxy_id, pool_name, duration_seconds) else: VSC_WARN("proxy_mgr", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)