proxy_manager.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import threading
  2. import time
  3. import json
  4. import os
  5. import random
  6. from typing import List, Optional, Dict, Any
  7. from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR
  8. class ProxyManager:
  9. """
  10. 代理管理器 (支持数据落盘持久化 + 代理池隔离)
  11. - 静态配置: config/proxies.json
  12. - 动态状态: data/proxy_states.json
  13. """
  14. _instance = None
  15. _lock = threading.RLock()
  16. def __new__(cls):
  17. with cls._lock:
  18. if cls._instance is None:
  19. cls._instance = super().__new__(cls)
  20. cls._instance._init_data()
  21. return cls._instance
  22. @staticmethod
  23. def Instance():
  24. return ProxyManager()
  25. def _init_data(self):
  26. # 静态数据: pool_name -> List[Dict]
  27. self._proxies: Dict[str, List[Dict]] = {}
  28. # 动态锁定状态: "pool_name::proxy_id" -> lock_until_timestamp
  29. self._lock_states: Dict[str, float] = {}
  30. self._proxy_lock = threading.RLock()
  31. # 路径配置
  32. self._config_path = "config/proxies.json"
  33. self._state_path = "data/proxy_states.json"
  34. # 确保 data 目录存在
  35. os.makedirs(os.path.dirname(self._state_path), exist_ok=True)
  36. self.reload_config()
  37. def _get_state_key(self, pool_name: str, proxy_id: Any) -> str:
  38. """
  39. 生成唯一的锁定状态 Key (组合键)
  40. 格式: pool_name::proxy_id
  41. """
  42. return f"{pool_name}::{proxy_id}"
  43. def reload_config(self):
  44. """(重新)加载本地配置文件,并恢复锁定状态"""
  45. if not os.path.exists(self._config_path):
  46. VSC_WARN("proxy_mgr", f"Config file not found: {self._config_path}. Proxy pools are empty.")
  47. return
  48. try:
  49. # 1. 先加载磁盘上的锁定状态
  50. self._load_state_from_disk()
  51. # 2. 加载静态配置
  52. with open(self._config_path, 'r', encoding='utf-8') as f:
  53. data = json.load(f)
  54. count = 0
  55. with self._proxy_lock:
  56. self._proxies.clear()
  57. for pool_name, proxy_list in data.items():
  58. processed_list = []
  59. for p in proxy_list:
  60. # 校验必要字段
  61. if "id" not in p or "ip" not in p or "port" not in p:
  62. continue
  63. # 初始化默认字段
  64. p.setdefault('scheme', 'http')
  65. p.setdefault('username', '')
  66. p.setdefault('password', '')
  67. # 恢复锁定状态
  68. proxy_id = p["id"]
  69. state_key = self._get_state_key(pool_name, proxy_id)
  70. # 从全局状态中获取该池子下该代理的锁定时间
  71. saved_lock = self._lock_states.get(state_key, 0)
  72. p['lock_until'] = saved_lock
  73. processed_list.append(p)
  74. count += 1
  75. self._proxies[pool_name] = processed_list
  76. VSC_INFO("proxy_mgr", f"Loaded {count} proxies. Lock states restored from {self._state_path}")
  77. except json.JSONDecodeError:
  78. VSC_ERROR("proxy_mgr", f"Invalid JSON format in {self._config_path}")
  79. except Exception as e:
  80. VSC_ERROR("proxy_mgr", f"Failed to load proxy config: {e}")
  81. def _load_state_from_disk(self):
  82. """从磁盘读取状态文件"""
  83. with self._proxy_lock:
  84. self._lock_states.clear()
  85. if not os.path.exists(self._state_path):
  86. return
  87. try:
  88. with open(self._state_path, 'r', encoding='utf-8') as f:
  89. data = json.load(f)
  90. for k, v in data.items():
  91. self._lock_states[str(k)] = float(v)
  92. except Exception as e:
  93. VSC_WARN("proxy_mgr", f"Failed to load state file: {e}. Starting fresh.")
  94. def _save_state_to_disk(self):
  95. """
  96. 将当前内存中的锁定状态写入磁盘
  97. 只保存未来还会被锁定的记录,减少文件大小
  98. """
  99. try:
  100. now = time.time()
  101. active_states = {
  102. k: v for k, v in self._lock_states.items()
  103. if v > now
  104. }
  105. # 更新内存缓存
  106. self._lock_states = active_states
  107. with open(self._state_path, 'w', encoding='utf-8') as f:
  108. json.dump(active_states, f, indent=4)
  109. except Exception as e:
  110. VSC_ERROR("proxy_mgr", f"Failed to save state to disk: {e}")
  111. def next(self, pool_name: str, lock_duration: float = 0.0) -> Optional[Dict[str, Any]]:
  112. """
  113. 从指定池中获取下一个可用代理,并锁定指定时长。
  114. @param pool_name: 代理池名称
  115. @param lock_duration: 锁定时间(秒)。
  116. 注意:如果不传或传0,则不锁定(仅获取),适合无状态请求。
  117. 如果为了限制频率,请传入大于0的值。
  118. """
  119. with self._proxy_lock:
  120. proxies = self._proxies.get(pool_name, [])
  121. if not proxies:
  122. # VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name)
  123. return None
  124. now = time.time()
  125. available = []
  126. # 筛选可用代理
  127. for p in proxies:
  128. proxy_id = p["id"]
  129. state_key = self._get_state_key(pool_name, proxy_id)
  130. # 检查隔离的锁定状态
  131. current_lock = self._lock_states.get(state_key, 0)
  132. if current_lock <= now:
  133. available.append(p)
  134. if not available:
  135. # VSC_WARN("proxy_mgr", "Pool '%s' has proxies but all are locked/busy.", pool_name)
  136. return None
  137. # 随机选择
  138. selected = random.choice(available)
  139. proxy_id = selected["id"]
  140. # === 如果需要锁定,更新全局状态并落盘 ===
  141. if lock_duration > 0:
  142. new_lock_until = now + lock_duration
  143. state_key = self._get_state_key(pool_name, proxy_id)
  144. # 1. 更新状态
  145. self._lock_states[state_key] = new_lock_until
  146. selected["lock_until"] = new_lock_until
  147. # 2. 立即落盘
  148. self._save_state_to_disk()
  149. VSC_DEBUG("proxy_mgr", "Selected proxy ID %s (Pool: %s), locked for %.0fs",
  150. proxy_id, pool_name, lock_duration)
  151. else:
  152. VSC_DEBUG("proxy_mgr", "Selected proxy ID %s (Pool: %s) without lock",
  153. proxy_id, pool_name)
  154. return selected.copy()
  155. def lock(self, pool_name: str, proxy_id: int, duration_seconds: int):
  156. """
  157. 手动锁定指定代理 (支持数据落盘)
  158. 通常用于在请求失败(如429)时紧急调用
  159. """
  160. with self._proxy_lock:
  161. proxies = self._proxies.get(pool_name, [])
  162. target_proxy = None
  163. for p in proxies:
  164. if p["id"] == proxy_id:
  165. target_proxy = p
  166. break
  167. if target_proxy:
  168. state_key = self._get_state_key(pool_name, proxy_id)
  169. new_lock_until = time.time() + duration_seconds
  170. # 更新状态
  171. self._lock_states[state_key] = new_lock_until
  172. target_proxy["lock_until"] = new_lock_until
  173. # 立即落盘
  174. self._save_state_to_disk()
  175. VSC_INFO("proxy_mgr", "Manually locked proxy ID %d in pool '%s' for %ds",
  176. proxy_id, pool_name, duration_seconds)
  177. else:
  178. VSC_WARN("proxy_mgr", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)