import threading import time import json import os import random from typing import Optional, Dict, Any, List from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR class AccountManager: """ 账户管理器 (仅本地配置文件模式) 读取 config/accounts.json """ _instance = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_data() return cls._instance @staticmethod def Instance(): return AccountManager() def _init_data(self): self._accounts: Dict[str, List[Dict]] = {} # pool_name -> [account_dict] self._account_lock = threading.RLock() self._config_path = "config/accounts.json" # 初始化时加载 self.reload_config() def reload_config(self): """(重新)加载本地配置文件""" if not os.path.exists(self._config_path): VSC_WARN("acc_mgr", f"Config file not found: {self._config_path}. Account pools are empty.") return try: with open(self._config_path, 'r', encoding='utf-8') as f: data = json.load(f) count = 0 with self._account_lock: self._accounts.clear() for pool_name, acc_list in data.items(): processed_list = [] for acc in acc_list: # 确保必要字段存在,并初始化状态 if "id" not in acc or "username" not in acc: continue acc.setdefault('lock_until', 0) # bound_data 用于存储绑定的预约人信息,配置文件里可以是 null acc.setdefault('bound_data', None) processed_list.append(acc) count += 1 self._accounts[pool_name] = processed_list VSC_INFO("acc_mgr", f"Loaded {count} accounts from {self._config_path}") except json.JSONDecodeError: VSC_ERROR("acc_mgr", f"Invalid JSON format in {self._config_path}") except Exception as e: VSC_ERROR("acc_mgr", f"Failed to load config: {e}") def next(self, pool_name: str, lock_duration: float = 60.0) -> Optional[Dict[str, Any]]: """ 获取下一个可用账号 (随机策略),并自动锁定指定时长。 @param pool_name: 账号池名称 @param lock_duration: 锁定时间(秒),默认60秒 @return: 账号信息字典 或 None """ with self._account_lock: accounts = self._accounts.get(pool_name, []) if not accounts: VSC_WARN("acc_mgr", "No accounts found for pool '%s'", pool_name) return None now = time.time() # 筛选未锁定的账号 (lock_until 必须小于等于当前时间) available = [acc for acc in accounts if acc.get("lock_until", 0) <= now] if not available: VSC_DEBUG("acc_mgr", "Pool '%s' has %d accounts but all are locked.", pool_name, len(accounts)) return None # 随机选择 selected = random.choice(available) # === 关键修改:立即更新锁定时间 === # 直接修改引用的对象,确保其他线程或其他逻辑能感知到该账号已被占用 selected["lock_until"] = now + lock_duration VSC_DEBUG("acc_mgr", "Selected account %s (id=%s) from pool %s, locked for %.1fs", selected.get("username"), selected.get("id"), pool_name, lock_duration) # 返回账号数据的浅拷贝,防止调用者无意修改 Manager 内部的其他字段 # (虽然 Python dict 是引用传递,但 copy 一下是个好习惯,除非你需要引用修改) return selected.copy() def lock(self, pool_name: str, account_id: int, duration_seconds: int): """ 锁定账号一段时间 """ with self._account_lock: accounts = self._accounts.get(pool_name, []) for acc in accounts: if acc["id"] == account_id: acc["lock_until"] = time.time() + duration_seconds VSC_INFO("acc_mgr", "Locked account %s (id=%d) for %ds", acc.get("username"), account_id, duration_seconds) return VSC_WARN("acc_mgr", "Account %d not found in pool %s to lock", account_id, pool_name) def remove(self, pool_name: str, account_id: int, reason: str = "success", extra_data: dict = None): """ 从内存中移除账号 (预订成功后不再使用) 注意:这不会修改磁盘上的 accounts.json 文件,重启程序后账号会恢复 """ with self._account_lock: accounts = self._accounts.get(pool_name, []) target_acc = None for acc in accounts: if acc["id"] == account_id: target_acc = acc break if target_acc: accounts.remove(target_acc) VSC_INFO("acc_mgr", "Removed account %s (id=%d) from memory pool %s. Reason: %s", target_acc.get("username"), account_id, pool_name, reason) else: VSC_WARN("acc_mgr", "Attempt to remove non-existent account %d from pool %s", account_id, pool_name)