import threading import json import os from typing import List, Optional, Tuple, Dict, Any from vs_log_macros import VSC_DEBUG, VSC_INFO, VSC_WARN, VSC_ERROR class BindingManager: """ 绑定管理器 (支持从配置文件加载静态绑定 + 运行时动态绑定) 读取 config/bindings.json """ _instance = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_data() return cls._instance @staticmethod def Instance(): return BindingManager() def _init_data(self): # Key: (account_pool, account_id) # Value: (proxy_pool, proxy_id, bind_type) self._bindings: Dict[Tuple[str, int], Tuple[str, int, str]] = {} self._binding_lock = threading.RLock() self._config_path = "config/bindings.json" self.reload_config() def reload_config(self): """加载本地绑定配置 (通常用于静态绑定)""" if not os.path.exists(self._config_path): # 绑定文件不存在是正常的,可能全是动态绑定 VSC_DEBUG("binding_mgr", f"Config file not found: {self._config_path}. No static bindings loaded.") return try: with open(self._config_path, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, list): VSC_ERROR("binding_mgr", "Invalid JSON format: bindings must be a list") return count = 0 with self._binding_lock: # 策略:不清除运行时生成的动态绑定,仅覆盖/添加配置文件中的静态绑定 # 如果需要重置,可以先 self._bindings.clear() for item in data: # 校验字段 if not all(k in item for k in ("account_pool", "account_id", "proxy_pool", "proxy_id")): continue key = (item["account_pool"], item["account_id"]) val = (item["proxy_pool"], item["proxy_id"], item.get("bind_type", "static")) self._bindings[key] = val count += 1 VSC_INFO("binding_mgr", f"Loaded {count} static bindings from {self._config_path}") except json.JSONDecodeError: VSC_ERROR("binding_mgr", f"Invalid JSON format in {self._config_path}") except Exception as e: VSC_ERROR("binding_mgr", f"Failed to load bindings: {e}") def get_bounded_proxy_id(self, account_pool: str, account_id: int) -> Optional[int]: """获取给定账户绑定的代理ID""" with self._binding_lock: key = (account_pool, account_id) binding = self._bindings.get(key) if binding: proxy_pool, proxy_id, b_type = binding VSC_DEBUG("binding_mgr", "Found %s binding: Acc(%s:%d) -> Proxy(%s:%d)", b_type, account_pool, account_id, proxy_pool, proxy_id) return proxy_id VSC_DEBUG("binding_mgr", "No binding found for Acc(%s:%d)", account_pool, account_id) return None def get_bounded_proxies_ids(self, account_pool: str, proxy_pool: str) -> List[int]: """ 获取所有在特定代理池中被账户绑定的代理ID列表。 用于 ProxyManager 过滤掉已被占用的代理。 """ with self._binding_lock: bounded_ids = [] for (acc_p, _), (px_p, px_id, _) in self._bindings.items(): if acc_p == account_pool and px_p == proxy_pool: bounded_ids.append(px_id) # VSC_DEBUG("binding_mgr", "Bounded IDs in %s (for %s): %s", proxy_pool, account_pool, bounded_ids) return bounded_ids def create_binding(self, account_pool: str, account_id: int, proxy_pool: str, proxy_id: int, bind_type: str): """ 创建绑定 (运行时调用,如动态绑定) 注意:运行时创建的绑定目前仅存在内存中,重启后丢失(除非写入文件,当前版本暂不实现回写) """ with self._binding_lock: key = (account_pool, account_id) self._bindings[key] = (proxy_pool, proxy_id, bind_type) VSC_INFO("binding_mgr", "Created binding: Acc(%s:%d) -> Proxy(%s:%d) [%s]", account_pool, account_id, proxy_pool, proxy_id, bind_type) def remove_binding(self, account_pool: str, account_id: int): """移除绑定""" with self._binding_lock: key = (account_pool, account_id) if key in self._bindings: del self._bindings[key] VSC_INFO("binding_mgr", "Removed binding for Acc(%s:%d)", account_pool, account_id)