| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import threading
- import time
- import json
- import os
- import random
- from typing import List, Optional, Dict, Any
- from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO, VSC_ERROR
- class ProxyManager:
- """
- 代理管理器 (仅本地配置文件模式)
- 读取 config/proxies.json
- """
- _instance = None
- _lock = threading.RLock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._init_data()
- return cls._instance
- @staticmethod
- def Instance():
- return ProxyManager()
- def _init_data(self):
- self._proxies: Dict[str, List[Dict]] = {} # pool_name -> [proxy_dict]
- self._proxy_lock = threading.RLock()
- self._config_path = "config/proxies.json"
-
- self.reload_config()
- def reload_config(self):
- """(重新)加载本地配置文件"""
- if not os.path.exists(self._config_path):
- VSC_WARN("proxy_mgr", f"Config file not found: {self._config_path}. Proxy pools are empty.")
- return
- try:
- with open(self._config_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
-
- count = 0
- with self._proxy_lock:
- self._proxies.clear()
- for pool_name, proxy_list in data.items():
- processed_list = []
- for p in proxy_list:
- # 校验必要字段
- if "id" not in p or "ip" not in p or "port" not in p:
- continue
-
- # 初始化状态
- p.setdefault('lock_until', 0)
- p.setdefault('scheme', 'http')
- p.setdefault('username', '')
- p.setdefault('password', '')
-
- processed_list.append(p)
- count += 1
-
- self._proxies[pool_name] = processed_list
-
- VSC_INFO("proxy_mgr", f"Loaded {count} proxies from {self._config_path}")
-
- except json.JSONDecodeError:
- VSC_ERROR("proxy_mgr", f"Invalid JSON format in {self._config_path}")
- except Exception as e:
- VSC_ERROR("proxy_mgr", f"Failed to load proxy config: {e}")
- def get_next_proxy(self, pool_name: str) -> Optional[Dict[str, Any]]:
- """
- 从指定池中获取下一个可用代理 (随机)
- """
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- if not proxies:
- VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name)
- return None
-
- now = time.time()
- available_proxies = [p for p in proxies if p["lock_until"] <= now]
-
- if not available_proxies:
- VSC_WARN("proxy_mgr", "Pool '%s' has proxies but all are locked/busy.", pool_name)
- return None
-
- proxy = random.choice(available_proxies)
- VSC_DEBUG("proxy_mgr", "Selected proxy ID %d (%s) from pool '%s'",
- proxy["id"], proxy["ip"], pool_name)
- return proxy
- def get_unbind_proxy(self, pool_name: str, bounded_ids: List[int]) -> Optional[Dict[str, Any]]:
- """
- 获取一个未绑定(且未锁定)的代理。
- 用于 GCO 的 IP 绑定逻辑。
- """
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- if not proxies:
- VSC_WARN("proxy_mgr", "No proxies found in pool '%s'", pool_name)
- return None
- now = time.time()
-
- # 筛选条件:1. 未锁定 2. ID 不在已绑定列表中
- unbound_and_available = [
- p for p in proxies
- if p["id"] not in bounded_ids and p["lock_until"] <= now
- ]
-
- if not unbound_and_available:
- VSC_WARN("proxy_mgr", "No unbound and available proxies in pool '%s'", pool_name)
- return None
-
- proxy = random.choice(unbound_and_available)
- VSC_DEBUG("proxy_mgr", "Selected unbound proxy ID %d from pool '%s'", proxy["id"], pool_name)
- return proxy
- def lock_proxy(self, pool_name: str, proxy_id: int, duration_seconds: int):
- """
- 锁定指定代理一段时间 (例如请求过于频繁被 429)
- """
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- for p in proxies:
- if p["id"] == proxy_id:
- p["lock_until"] = time.time() + duration_seconds
- VSC_INFO("proxy_mgr", "Locked proxy ID %d in pool '%s' for %ds",
- proxy_id, pool_name, duration_seconds)
- return
- VSC_WARN("proxy_mgr", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)
|