| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import threading
- import time
- import json
- import os
- import random
- from typing import Optional, Dict, Any, List
- # 假设这些日志宏在你的项目中可用,如果不可用请替换为 print 或 logging
- from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR
- class AccountManager:
- """
- 账户管理器 (支持数据落盘持久化 + 账号池隔离)
- - 静态配置: config/accounts.json
- - 动态状态: data/account_states.json
-
- 关键特性:同一个邮箱如果存在于不同的 pool 中,其冷却时间是隔离的。
- """
- _instance = None
- _lock = threading.RLock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._init_data()
- return cls._instance
- @staticmethod
- def Instance():
- return AccountManager()
- def _init_data(self):
- # 静态账号数据: pool_name -> List[Dict]
- self._accounts: Dict[str, List[Dict]] = {}
-
- # 动态锁定状态: "pool_name::username" -> lock_until_timestamp
- # 使用组合键来保证不同池子的同名账号隔离
- self._lock_states: Dict[str, float] = {}
-
- self._account_lock = threading.RLock()
-
- # 路径配置
- self._config_path = "config/accounts.json"
- self._state_path = "data/account_states.json"
-
- # 确保 data 目录存在
- os.makedirs(os.path.dirname(self._state_path), exist_ok=True)
-
- # 初始化时加载
- self.reload_config()
- def _get_state_key(self, pool_name: str, username: str) -> str:
- """
- 生成唯一的锁定状态 Key。
- 格式: pool_name::username
- 这保证了不同池子里的同名账号互不干扰。
- """
- return f"{pool_name}::{username}"
- def reload_config(self):
- """(重新)加载本地配置文件,并恢复锁定状态"""
- if not os.path.exists(self._config_path):
- VSC_WARN("acc_mgr", f"Config file not found: {self._config_path}. Account pools are empty.")
- return
- try:
- # 1. 先加载磁盘上的锁定状态
- self._load_state_from_disk()
- # 2. 加载静态配置
- with open(self._config_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
-
- count = 0
- with self._account_lock:
- self._accounts.clear()
- for pool_name, acc_list in data.items():
- processed_list = []
- for acc in acc_list:
- if "id" not in acc or "username" not in acc:
- continue
-
- # 初始化 bound_data
- acc.setdefault('bound_data', None)
-
- # 生成组合键,去查找之前的锁定记录
- username = acc["username"]
- state_key = self._get_state_key(pool_name, username)
-
- # 恢复锁定时间 (如果文件中没有记录,默认为0)
- saved_lock = self._lock_states.get(state_key, 0)
- acc['lock_until'] = saved_lock
-
- processed_list.append(acc)
- count += 1
-
- self._accounts[pool_name] = processed_list
-
- VSC_INFO("acc_mgr", f"Loaded {count} accounts. Lock states restored from {self._state_path}")
-
- except json.JSONDecodeError:
- VSC_ERROR("acc_mgr", f"Invalid JSON format in {self._config_path}")
- except Exception as e:
- VSC_ERROR("acc_mgr", f"Failed to load config: {e}")
- def _load_state_from_disk(self):
- """从磁盘读取状态文件"""
- with self._account_lock:
- self._lock_states.clear()
- if not os.path.exists(self._state_path):
- return
- try:
- with open(self._state_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- # 加载数据到内存
- for k, v in data.items():
- self._lock_states[str(k)] = float(v)
- except Exception as e:
- VSC_WARN("acc_mgr", f"Failed to load state file: {e}. Starting fresh.")
- def _save_state_to_disk(self):
- """
- 将当前内存中的锁定状态写入磁盘
- 注意:此方法必须在 self._account_lock 保护下调用
- """
- try:
- now = time.time()
- # 过滤:只保存未来还会被锁定的账号
- # Key 是 "pool_name::username"
- active_states = {
- k: v for k, v in self._lock_states.items()
- if v > now
- }
-
- # 更新内存缓存(剔除已过期的)
- self._lock_states = active_states
- with open(self._state_path, 'w', encoding='utf-8') as f:
- json.dump(active_states, f, indent=4)
-
- except Exception as e:
- VSC_ERROR("acc_mgr", f"Failed to save state to disk: {e}")
- def next(self, pool_name: str, lock_duration: float = 60.0) -> Optional[Dict[str, Any]]:
- """
- 获取下一个可用账号,并自动锁定指定时长。
-
- @param pool_name: 账号池名称
- @param lock_duration: 锁定时间(秒)
- @return: 账号信息字典 或 None
- """
- with self._account_lock: # 全局锁
- accounts = self._accounts.get(pool_name, [])
- if not accounts:
- # VSC_WARN("acc_mgr", "No accounts found for pool '%s'", pool_name)
- return None
- now = time.time()
- available = []
-
- # 筛选逻辑
- for acc in accounts:
- username = acc.get("username")
- # 生成组合键
- state_key = self._get_state_key(pool_name, username)
-
- # 检查隔离的锁定状态
- current_lock = self._lock_states.get(state_key, 0)
-
- if current_lock <= now:
- available.append(acc)
-
- if not available:
- # 当前池子所有账号都在冷却中
- return None
- # 随机选择
- selected = random.choice(available)
- username = selected.get("username")
-
- # === 更新状态 ===
- new_lock_until = now + lock_duration
- state_key = self._get_state_key(pool_name, username)
-
- # 1. 更新全局隔离状态字典
- self._lock_states[state_key] = new_lock_until
-
- # 2. 更新内存对象 (供 remove 等其他逻辑参考)
- selected["lock_until"] = new_lock_until
-
- # 3. 立即落盘
- self._save_state_to_disk()
-
- VSC_INFO("acc_mgr", "Selected %s (Pool: %s), locked for %.0fs (Until: %s)",
- username, pool_name, lock_duration,
- time.strftime("%H:%M:%S", time.localtime(new_lock_until)))
-
- # 返回深拷贝或浅拷贝,防止外部污染
- return selected.copy()
- def lock(self, pool_name: str, account_id: int, duration_seconds: int):
- """
- 手动锁定账号 (支持隔离)
- """
- with self._account_lock:
- accounts = self._accounts.get(pool_name, [])
- target_acc = None
-
- # 找到账号对象
- for acc in accounts:
- if acc["id"] == account_id:
- target_acc = acc
- break
-
- if target_acc:
- username = target_acc.get("username")
- new_lock_until = time.time() + duration_seconds
- state_key = self._get_state_key(pool_name, username)
-
- # 更新状态
- self._lock_states[state_key] = new_lock_until
- target_acc["lock_until"] = new_lock_until
-
- # 落盘
- self._save_state_to_disk()
-
- VSC_INFO("acc_mgr", "Manually locked %s (Pool: %s) for %ds",
- username, pool_name, duration_seconds)
- else:
- VSC_WARN("acc_mgr", "Account ID %d not found in pool %s to lock", account_id, pool_name)
- def remove(self, pool_name: str, account_id: int, reason: str = "success", extra_data: dict = None):
- """
- 从内存池中移除账号 (仅影响本次运行内存,不影响文件配置)
- """
- with self._account_lock:
- accounts = self._accounts.get(pool_name, [])
- target_acc = None
- for acc in accounts:
- if acc["id"] == account_id:
- target_acc = acc
- break
-
- if target_acc:
- accounts.remove(target_acc)
- VSC_INFO("acc_mgr", "Removed %s from memory pool %s. Reason: %s",
- target_acc.get("username"), pool_name, reason)
- else:
- pass
|