# toolkit/binding_manager.py import threading from typing import List, Optional from vs_log_macros import VSC_DEBUG, VSC_INFO # type: ignore class BindingManager: """模拟账户-代理绑定管理器。""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._bindings = {} # { (account_pool, account_id): (proxy_pool, proxy_id, bind_type) } cls._instance._binding_lock = threading.Lock() return cls._instance @staticmethod def Instance(): return BindingManager() def get_bounded_proxy_id(self, account_pool: str, account_id: int) -> Optional[int]: """获取给定账户绑定的代理ID。""" with self._binding_lock: key = (account_pool, account_id) binding = self._bindings.get(key) if binding: VSC_DEBUG("binding_manager", "Found binding for account %d: proxy %d", account_id, binding[1]) return binding[1] VSC_DEBUG("binding_manager", "No binding found for account %d in pool %s", account_id, account_pool) return None def get_bounded_proxies_ids(self, account_pool: str, proxy_pool: str) -> List[int]: """获取所有在特定代理池中被账户绑定的代理ID列表。""" with self._binding_lock: bounded_ids = [] for (acc_pool, _), (p_pool, p_id, _) in self._bindings.items(): if acc_pool == account_pool and p_pool == proxy_pool: bounded_ids.append(p_id) VSC_DEBUG("binding_manager", "Bounded proxy IDs in pool %s for acc pool %s: %s", proxy_pool, account_pool, bounded_ids) return bounded_ids def create_binding(self, account_pool: str, account_id: int, proxy_pool: str, proxy_id: int, bind_type: str): """创建账户和代理的绑定。""" with self._binding_lock: key = (account_pool, account_id) self._bindings[key] = (proxy_pool, proxy_id, bind_type) VSC_INFO("binding_manager", "Created binding: account %d (pool %s) -> proxy %d (pool %s) type %s", account_id, account_pool, proxy_id, proxy_pool, bind_type) def remove_binding(self, account_pool: str, account_id: int): """移除账户和代理的绑定。""" with self._binding_lock: key = (account_pool, account_id) if key in self._bindings: del self._bindings[key] VSC_INFO("binding_manager", "Removed binding for account %d in pool %s", account_id, account_pool)