import threading import time import json import os import random from typing import Optional, Dict, Any, List # 假设这些日志宏在你的项目中可用,如果不可用请替换为 print 或 logging from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR class AccountManager: """ 账户管理器 (支持数据落盘持久化 + 账号池隔离) - 静态配置: config/accounts.json - 动态状态: data/account_states.json 关键特性:同一个邮箱如果存在于不同的 pool 中,其冷却时间是隔离的。 """ _instance = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_data() return cls._instance @staticmethod def Instance(): return AccountManager() def _init_data(self): # 静态账号数据: pool_name -> List[Dict] self._accounts: Dict[str, List[Dict]] = {} # 动态锁定状态: "pool_name::username" -> lock_until_timestamp # 使用组合键来保证不同池子的同名账号隔离 self._lock_states: Dict[str, float] = {} self._account_lock = threading.RLock() # 路径配置 self._config_path = "config/accounts.json" self._state_path = "data/account_states.json" # 确保 data 目录存在 os.makedirs(os.path.dirname(self._state_path), exist_ok=True) # 初始化时加载 self.reload_config() def _get_state_key(self, pool_name: str, username: str) -> str: """ 生成唯一的锁定状态 Key。 格式: pool_name::username 这保证了不同池子里的同名账号互不干扰。 """ return f"{pool_name}::{username}" def reload_config(self): """(重新)加载本地配置文件,并恢复锁定状态""" if not os.path.exists(self._config_path): VSC_WARN("acc_mgr", f"Config file not found: {self._config_path}. Account pools are empty.") return try: # 1. 先加载磁盘上的锁定状态 self._load_state_from_disk() # 2. 加载静态配置 with open(self._config_path, 'r', encoding='utf-8') as f: data = json.load(f) count = 0 with self._account_lock: self._accounts.clear() for pool_name, acc_list in data.items(): processed_list = [] for acc in acc_list: if "id" not in acc or "username" not in acc: continue # 初始化 bound_data acc.setdefault('bound_data', None) # 生成组合键,去查找之前的锁定记录 username = acc["username"] state_key = self._get_state_key(pool_name, username) # 恢复锁定时间 (如果文件中没有记录,默认为0) saved_lock = self._lock_states.get(state_key, 0) acc['lock_until'] = saved_lock processed_list.append(acc) count += 1 self._accounts[pool_name] = processed_list VSC_INFO("acc_mgr", f"Loaded {count} accounts. Lock states restored from {self._state_path}") except json.JSONDecodeError: VSC_ERROR("acc_mgr", f"Invalid JSON format in {self._config_path}") except Exception as e: VSC_ERROR("acc_mgr", f"Failed to load config: {e}") def _load_state_from_disk(self): """从磁盘读取状态文件""" with self._account_lock: self._lock_states.clear() if not os.path.exists(self._state_path): return try: with open(self._state_path, 'r', encoding='utf-8') as f: data = json.load(f) # 加载数据到内存 for k, v in data.items(): self._lock_states[str(k)] = float(v) except Exception as e: VSC_WARN("acc_mgr", f"Failed to load state file: {e}. Starting fresh.") def _save_state_to_disk(self): """ 将当前内存中的锁定状态写入磁盘 注意:此方法必须在 self._account_lock 保护下调用 """ try: now = time.time() # 过滤:只保存未来还会被锁定的账号 # Key 是 "pool_name::username" active_states = { k: v for k, v in self._lock_states.items() if v > now } # 更新内存缓存(剔除已过期的) self._lock_states = active_states with open(self._state_path, 'w', encoding='utf-8') as f: json.dump(active_states, f, indent=4) except Exception as e: VSC_ERROR("acc_mgr", f"Failed to save state to disk: {e}") def next(self, pool_name: str, lock_duration: float = 60.0) -> Optional[Dict[str, Any]]: """ 获取下一个可用账号,并自动锁定指定时长。 @param pool_name: 账号池名称 @param lock_duration: 锁定时间(秒) @return: 账号信息字典 或 None """ with self._account_lock: # 全局锁 accounts = self._accounts.get(pool_name, []) if not accounts: # VSC_WARN("acc_mgr", "No accounts found for pool '%s'", pool_name) return None now = time.time() available = [] # 筛选逻辑 for acc in accounts: username = acc.get("username") # 生成组合键 state_key = self._get_state_key(pool_name, username) # 检查隔离的锁定状态 current_lock = self._lock_states.get(state_key, 0) if current_lock <= now: available.append(acc) if not available: # 当前池子所有账号都在冷却中 return None # 随机选择 selected = random.choice(available) username = selected.get("username") # === 更新状态 === new_lock_until = now + lock_duration state_key = self._get_state_key(pool_name, username) # 1. 更新全局隔离状态字典 self._lock_states[state_key] = new_lock_until # 2. 更新内存对象 (供 remove 等其他逻辑参考) selected["lock_until"] = new_lock_until # 3. 立即落盘 self._save_state_to_disk() VSC_INFO("acc_mgr", "Selected %s (Pool: %s), locked for %.0fs (Until: %s)", username, pool_name, lock_duration, time.strftime("%H:%M:%S", time.localtime(new_lock_until))) # 返回深拷贝或浅拷贝,防止外部污染 return selected.copy() def lock(self, pool_name: str, account_id: int, duration_seconds: int): """ 手动锁定账号 (支持隔离) """ with self._account_lock: accounts = self._accounts.get(pool_name, []) target_acc = None # 找到账号对象 for acc in accounts: if acc["id"] == account_id: target_acc = acc break if target_acc: username = target_acc.get("username") new_lock_until = time.time() + duration_seconds state_key = self._get_state_key(pool_name, username) # 更新状态 self._lock_states[state_key] = new_lock_until target_acc["lock_until"] = new_lock_until # 落盘 self._save_state_to_disk() VSC_INFO("acc_mgr", "Manually locked %s (Pool: %s) for %ds", username, pool_name, duration_seconds) else: VSC_WARN("acc_mgr", "Account ID %d not found in pool %s to lock", account_id, pool_name) def remove(self, pool_name: str, account_id: int, reason: str = "success", extra_data: dict = None): """ 从内存池中移除账号 (仅影响本次运行内存,不影响文件配置) """ with self._account_lock: accounts = self._accounts.get(pool_name, []) target_acc = None for acc in accounts: if acc["id"] == account_id: target_acc = acc break if target_acc: accounts.remove(target_acc) VSC_INFO("acc_mgr", "Removed %s from memory pool %s. Reason: %s", target_acc.get("username"), pool_name, reason) else: pass