# toolkit/account_manager.py import threading import time import random from typing import Optional, Dict, Any from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO # type: ignore class AccountManager: """模拟账户管理器,用于获取和锁定账户。""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._accounts = { "ie_nl": [ { "id": 580, "lock_until": 0, "password": "y1wm@hf0Bn", "username": "sg_fr_onoy9i@gmail-app.com" }, { "id": 581, "lock_until": 0, "password": "p7@@0NnOR8", "username": "sg_fr_k6vuhb@gmail-app.com" }, { "id": 582, "lock_until": 0, "password": "D@4OCh2j3I", "username": "sg_fr_cvq08b@gmail-app.com" }, { "id": 583, "lock_until": 0, "password": "xT01gsHw@j", "username": "sg_fr_4cn9xj@gmail-app.com" }, { "id": 584, "lock_until": 0, "password": "@aprZh7NbA", "username": "sg_fr_m32a2e@gmail-app.com" }, { "id": 585, "lock_until": 0, "password": "SR2hnX@ho5", "username": "sg_fr_p0nz5l@gmail-app.com" } ], "gb_fr": [ { "id": 0, "username":"arket_zz@163.com", "password": "Visafly@111", "lock_until": 0 }, ], "ie_es": [ { "id": 0, "username":"arket_zz@163.com", "password": "dx4ua@!.X.i8Xn8", "lock_until": 0 }, ] } cls._instance._account_lock = threading.Lock() # 用于保护账户状态 return cls._instance @staticmethod def Instance(): return AccountManager() def get_next_account(self, pool_name: str) -> Optional[Dict[str, Any]]: """从指定池中获取下一个可用账户。""" with self._account_lock: accounts = self._accounts.get(pool_name, []) now = time.time() available_accounts = [acc for acc in accounts if acc["lock_until"] <= now] if not available_accounts: VSC_WARN("account_manager", "No available accounts in pool '%s'", pool_name) return None # 简单地随机选择一个可用账户 account = random.choice(available_accounts) VSC_DEBUG("account_manager", "Selected account ID %d from pool '%s'", account["id"], pool_name) return account def lock_account(self, pool_name: str, account_id: int, duration_seconds: int): """锁定指定账户一段时间。""" with self._account_lock: accounts = self._accounts.get(pool_name, []) for acc in accounts: if acc["id"] == account_id: acc["lock_until"] = time.time() + duration_seconds VSC_INFO("account_manager", "Locked account ID %d in pool '%s' until %s", account_id, pool_name, time.ctime(acc["lock_until"])) return VSC_WARN("account_manager", "Account ID %d not found in pool '%s' for locking", account_id, pool_name)