account_manager.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import threading
  2. import time
  3. import json
  4. import os
  5. import random
  6. from typing import Optional, Dict, Any, List
  7. # 假设这些日志宏在你的项目中可用,如果不可用请替换为 print 或 logging
  8. from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR
  9. class AccountManager:
  10. """
  11. 账户管理器 (支持数据落盘持久化 + 账号池隔离)
  12. - 静态配置: config/accounts.json
  13. - 动态状态: data/account_states.json
  14. 关键特性:同一个邮箱如果存在于不同的 pool 中,其冷却时间是隔离的。
  15. """
  16. _instance = None
  17. _lock = threading.RLock()
  18. def __new__(cls):
  19. with cls._lock:
  20. if cls._instance is None:
  21. cls._instance = super().__new__(cls)
  22. cls._instance._init_data()
  23. return cls._instance
  24. @staticmethod
  25. def Instance():
  26. return AccountManager()
  27. def _init_data(self):
  28. # 静态账号数据: pool_name -> List[Dict]
  29. self._accounts: Dict[str, List[Dict]] = {}
  30. # 动态锁定状态: "pool_name::username" -> lock_until_timestamp
  31. # 使用组合键来保证不同池子的同名账号隔离
  32. self._lock_states: Dict[str, float] = {}
  33. self._account_lock = threading.RLock()
  34. # 路径配置
  35. self._config_path = "config/accounts.json"
  36. self._state_path = "data/account_states.json"
  37. # 确保 data 目录存在
  38. os.makedirs(os.path.dirname(self._state_path), exist_ok=True)
  39. # 初始化时加载
  40. self.reload_config()
  41. def _get_state_key(self, pool_name: str, username: str) -> str:
  42. """
  43. 生成唯一的锁定状态 Key。
  44. 格式: pool_name::username
  45. 这保证了不同池子里的同名账号互不干扰。
  46. """
  47. return f"{pool_name}::{username}"
  48. def reload_config(self):
  49. """(重新)加载本地配置文件,并恢复锁定状态"""
  50. if not os.path.exists(self._config_path):
  51. VSC_WARN("acc_mgr", f"Config file not found: {self._config_path}. Account pools are empty.")
  52. return
  53. try:
  54. # 1. 先加载磁盘上的锁定状态
  55. self._load_state_from_disk()
  56. # 2. 加载静态配置
  57. with open(self._config_path, 'r', encoding='utf-8') as f:
  58. data = json.load(f)
  59. count = 0
  60. with self._account_lock:
  61. self._accounts.clear()
  62. for pool_name, acc_list in data.items():
  63. processed_list = []
  64. for acc in acc_list:
  65. if "id" not in acc or "username" not in acc:
  66. continue
  67. # 初始化 bound_data
  68. acc.setdefault('bound_data', None)
  69. # 生成组合键,去查找之前的锁定记录
  70. username = acc["username"]
  71. state_key = self._get_state_key(pool_name, username)
  72. # 恢复锁定时间 (如果文件中没有记录,默认为0)
  73. saved_lock = self._lock_states.get(state_key, 0)
  74. acc['lock_until'] = saved_lock
  75. processed_list.append(acc)
  76. count += 1
  77. self._accounts[pool_name] = processed_list
  78. VSC_INFO("acc_mgr", f"Loaded {count} accounts. Lock states restored from {self._state_path}")
  79. except json.JSONDecodeError:
  80. VSC_ERROR("acc_mgr", f"Invalid JSON format in {self._config_path}")
  81. except Exception as e:
  82. VSC_ERROR("acc_mgr", f"Failed to load config: {e}")
  83. def _load_state_from_disk(self):
  84. """从磁盘读取状态文件"""
  85. with self._account_lock:
  86. self._lock_states.clear()
  87. if not os.path.exists(self._state_path):
  88. return
  89. try:
  90. with open(self._state_path, 'r', encoding='utf-8') as f:
  91. data = json.load(f)
  92. # 加载数据到内存
  93. for k, v in data.items():
  94. self._lock_states[str(k)] = float(v)
  95. except Exception as e:
  96. VSC_WARN("acc_mgr", f"Failed to load state file: {e}. Starting fresh.")
  97. def _save_state_to_disk(self):
  98. """
  99. 将当前内存中的锁定状态写入磁盘
  100. 注意:此方法必须在 self._account_lock 保护下调用
  101. """
  102. try:
  103. now = time.time()
  104. # 过滤:只保存未来还会被锁定的账号
  105. # Key 是 "pool_name::username"
  106. active_states = {
  107. k: v for k, v in self._lock_states.items()
  108. if v > now
  109. }
  110. # 更新内存缓存(剔除已过期的)
  111. self._lock_states = active_states
  112. with open(self._state_path, 'w', encoding='utf-8') as f:
  113. json.dump(active_states, f, indent=4)
  114. except Exception as e:
  115. VSC_ERROR("acc_mgr", f"Failed to save state to disk: {e}")
  116. def next(self, pool_name: str, lock_duration: float = 60.0) -> Optional[Dict[str, Any]]:
  117. """
  118. 获取下一个可用账号,并自动锁定指定时长。
  119. @param pool_name: 账号池名称
  120. @param lock_duration: 锁定时间(秒)
  121. @return: 账号信息字典 或 None
  122. """
  123. with self._account_lock: # 全局锁
  124. accounts = self._accounts.get(pool_name, [])
  125. if not accounts:
  126. # VSC_WARN("acc_mgr", "No accounts found for pool '%s'", pool_name)
  127. return None
  128. now = time.time()
  129. available = []
  130. # 筛选逻辑
  131. for acc in accounts:
  132. username = acc.get("username")
  133. # 生成组合键
  134. state_key = self._get_state_key(pool_name, username)
  135. # 检查隔离的锁定状态
  136. current_lock = self._lock_states.get(state_key, 0)
  137. if current_lock <= now:
  138. available.append(acc)
  139. if not available:
  140. # 当前池子所有账号都在冷却中
  141. return None
  142. # 随机选择
  143. selected = random.choice(available)
  144. username = selected.get("username")
  145. # === 更新状态 ===
  146. new_lock_until = now + lock_duration
  147. state_key = self._get_state_key(pool_name, username)
  148. # 1. 更新全局隔离状态字典
  149. self._lock_states[state_key] = new_lock_until
  150. # 2. 更新内存对象 (供 remove 等其他逻辑参考)
  151. selected["lock_until"] = new_lock_until
  152. # 3. 立即落盘
  153. self._save_state_to_disk()
  154. VSC_INFO("acc_mgr", "Selected %s (Pool: %s), locked for %.0fs (Until: %s)",
  155. username, pool_name, lock_duration,
  156. time.strftime("%H:%M:%S", time.localtime(new_lock_until)))
  157. # 返回深拷贝或浅拷贝,防止外部污染
  158. return selected.copy()
  159. def lock(self, pool_name: str, account_id: int, duration_seconds: int):
  160. """
  161. 手动锁定账号 (支持隔离)
  162. """
  163. with self._account_lock:
  164. accounts = self._accounts.get(pool_name, [])
  165. target_acc = None
  166. # 找到账号对象
  167. for acc in accounts:
  168. if acc["id"] == account_id:
  169. target_acc = acc
  170. break
  171. if target_acc:
  172. username = target_acc.get("username")
  173. new_lock_until = time.time() + duration_seconds
  174. state_key = self._get_state_key(pool_name, username)
  175. # 更新状态
  176. self._lock_states[state_key] = new_lock_until
  177. target_acc["lock_until"] = new_lock_until
  178. # 落盘
  179. self._save_state_to_disk()
  180. VSC_INFO("acc_mgr", "Manually locked %s (Pool: %s) for %ds",
  181. username, pool_name, duration_seconds)
  182. else:
  183. VSC_WARN("acc_mgr", "Account ID %d not found in pool %s to lock", account_id, pool_name)
  184. def remove(self, pool_name: str, account_id: int, reason: str = "success", extra_data: dict = None):
  185. """
  186. 从内存池中移除账号 (仅影响本次运行内存,不影响文件配置)
  187. """
  188. with self._account_lock:
  189. accounts = self._accounts.get(pool_name, [])
  190. target_acc = None
  191. for acc in accounts:
  192. if acc["id"] == account_id:
  193. target_acc = acc
  194. break
  195. if target_acc:
  196. accounts.remove(target_acc)
  197. VSC_INFO("acc_mgr", "Removed %s from memory pool %s. Reason: %s",
  198. target_acc.get("username"), pool_name, reason)
  199. else:
  200. pass