proxy_manager.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import threading
  2. import time
  3. import json
  4. import os
  5. import random
  6. from typing import List, Optional, Dict, Any
  7. from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR
  8. class ProxyManager:
  9. """
  10. 代理管理器 (仅本地配置文件模式)
  11. 读取 config/proxies.json
  12. """
  13. _instance = None
  14. _lock = threading.RLock()
  15. def __new__(cls):
  16. with cls._lock:
  17. if cls._instance is None:
  18. cls._instance = super().__new__(cls)
  19. cls._instance._init_data()
  20. return cls._instance
  21. @staticmethod
  22. def Instance():
  23. return ProxyManager()
  24. def _init_data(self):
  25. self._proxies: Dict[str, List[Dict]] = {} # pool_name -> [proxy_dict]
  26. self._proxy_lock = threading.RLock()
  27. self._config_path = "config/proxies.json"
  28. self.reload_config()
  29. def reload_config(self):
  30. """(重新)加载本地配置文件"""
  31. if not os.path.exists(self._config_path):
  32. VSC_WARN("proxy_mgr", f"Config file not found: {self._config_path}. Proxy pools are empty.")
  33. return
  34. try:
  35. with open(self._config_path, 'r', encoding='utf-8') as f:
  36. data = json.load(f)
  37. count = 0
  38. with self._proxy_lock:
  39. self._proxies.clear()
  40. for pool_name, proxy_list in data.items():
  41. processed_list = []
  42. for p in proxy_list:
  43. # 校验必要字段
  44. if "id" not in p or "ip" not in p or "port" not in p:
  45. continue
  46. # 初始化状态
  47. p.setdefault('lock_until', 0)
  48. p.setdefault('scheme', 'http')
  49. p.setdefault('username', '')
  50. p.setdefault('password', '')
  51. processed_list.append(p)
  52. count += 1
  53. self._proxies[pool_name] = processed_list
  54. VSC_INFO("proxy_mgr", f"Loaded {count} proxies from {self._config_path}")
  55. except json.JSONDecodeError:
  56. VSC_ERROR("proxy_mgr", f"Invalid JSON format in {self._config_path}")
  57. except Exception as e:
  58. VSC_ERROR("proxy_mgr", f"Failed to load proxy config: {e}")
  59. def get_next_proxy(self, pool_name: str) -> Optional[Dict[str, Any]]:
  60. """
  61. 从指定池中获取下一个可用代理 (随机)
  62. """
  63. with self._proxy_lock:
  64. proxies = self._proxies.get(pool_name, [])
  65. if not proxies:
  66. VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name)
  67. return None
  68. now = time.time()
  69. available_proxies = [p for p in proxies if p["lock_until"] <= now]
  70. if not available_proxies:
  71. VSC_WARN("proxy_mgr", "Pool '%s' has proxies but all are locked/busy.", pool_name)
  72. return None
  73. proxy = random.choice(available_proxies)
  74. VSC_DEBUG("proxy_mgr", "Selected proxy ID %d (%s) from pool '%s'",
  75. proxy["id"], proxy["ip"], pool_name)
  76. return proxy
  77. def get_unbind_proxy(self, pool_name: str, bounded_ids: List[int]) -> Optional[Dict[str, Any]]:
  78. """
  79. 获取一个未绑定(且未锁定)的代理。
  80. 用于 GroupCoordinator 的 IP 绑定逻辑。
  81. """
  82. with self._proxy_lock:
  83. proxies = self._proxies.get(pool_name, [])
  84. if not proxies:
  85. VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name)
  86. return None
  87. now = time.time()
  88. # 筛选条件:1. 未锁定 2. ID 不在已绑定列表中
  89. unbound_and_available = [
  90. p for p in proxies
  91. if p["id"] not in bounded_ids and p["lock_until"] <= now
  92. ]
  93. if not unbound_and_available:
  94. VSC_WARN("proxy_mgr", "No unbound and available proxies in pool '%s'", pool_name)
  95. return None
  96. proxy = random.choice(unbound_and_available)
  97. VSC_DEBUG("proxy_mgr", "Selected unbound proxy ID %d from pool '%s'", proxy["id"], pool_name)
  98. return proxy
  99. def lock_proxy(self, pool_name: str, proxy_id: int, duration_seconds: int):
  100. """
  101. 锁定指定代理一段时间 (例如请求过于频繁被 429)
  102. """
  103. with self._proxy_lock:
  104. proxies = self._proxies.get(pool_name, [])
  105. for p in proxies:
  106. if p["id"] == proxy_id:
  107. p["lock_until"] = time.time() + duration_seconds
  108. VSC_INFO("proxy_mgr", "Locked proxy ID %d in pool '%s' for %ds",
  109. proxy_id, pool_name, duration_seconds)
  110. return
  111. VSC_WARN("proxy_mgr", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)