| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- # toolkit/proxy_manager.py
- import threading
- import time
- import random
- from typing import List, Optional, Dict, Any
- from vs_log_macros import VSC_DEBUG, VSC_WARN, VSC_INFO # type: ignore
- class ProxyManager:
- """模拟代理管理器,用于获取和锁定代理。"""
- _instance = None
- _lock = threading.Lock()
- def __new__(cls):
- with cls._lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._proxies = {
- "global_proxy": [
- {
- "id": 100003,
- "ip": "82.152.15.37",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfs03sh920"
- },
- {
- "id": 100005,
- "ip": "82.152.19.246",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfmv4gp9c"
- },
- {
- "id": 100014,
- "ip": "82.152.15.33",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfpclja918"
- },
- {
- "id": 100018,
- "ip": "82.152.19.4",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfmv4gp914"
- },
- {
- "id": 100019,
- "ip": "82.152.15.8",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfmv4gp913"
- },
- {
- "id": 100020,
- "ip": "82.152.19.141",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfmv4gp912"
- },
- {
- "id": 100023,
- "ip": "82.152.19.2",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scepfmv4gp9e"
- },
- {
- "id": 100025,
- "ip": "163.5.40.236",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4scekff9u6j9a"
- },
- {
- "id": 100026,
- "ip": "45.196.65.44",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4sj3agscn8592cf"
- },
- {
- "id": 100027,
- "ip": "154.82.173.108",
- "lock_until": 0,
- "password": "M8v0m9b1p4b3",
- "port": 9856,
- "scheme": "http",
- "username": "l9w3z3c4B4O2"
- },
- {
- "id": 100028,
- "ip": "45.196.65.25",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4sio3rdic7m9184"
- },
- {
- "id": 100029,
- "ip": "45.196.65.109",
- "lock_until": 0,
- "password": "idzlar",
- "port": 7778,
- "scheme": "http",
- "username": "4sibp4qi26n93c"
- }
- ],
- "ireland_proxies": [
- {
- "id": 100029,
- "ip": "95.135.130.73",
- "lock_until": 0,
- "password": "RTBuPWx1CEr6DfD",
- "port": 48306,
- "scheme": "http",
- "username": "9zMOkhCng5HG8SZ"
- },
- {
- "id": 100029,
- "ip": "95.135.130.76",
- "lock_until": 0,
- "password": "5i4lV3VjNwE4bkL",
- "port": 41553,
- "scheme": "http",
- "username": "aNCaMFjblyODleO"
- }
- ]
- }
- cls._instance._proxy_lock = threading.Lock() # 用于保护代理状态
- return cls._instance
- @staticmethod
- def Instance():
- return ProxyManager()
- def get_next_proxy(self, pool_name: str) -> Optional[Dict[str, Any]]:
- """从指定池中获取下一个可用代理。"""
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- now = time.time()
-
- available_proxies = [p for p in proxies if p["lock_until"] <= now]
- if not available_proxies:
- VSC_WARN("proxy_manager", "No available proxies in pool '%s'", pool_name)
- return None
-
- proxy = random.choice(available_proxies)
- VSC_DEBUG("proxy_manager", "Selected proxy ID %d from pool '%s'", proxy["id"], pool_name)
- return proxy
- def get_unbind_proxy(self, pool_name: str, bounded_ids: List[int]) -> Optional[Dict[str, Any]]:
- """获取一个未绑定(且未锁定)的代理。"""
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- now = time.time()
-
- unbound_and_available_proxies = [
- p for p in proxies
- if p["id"] not in bounded_ids and p["lock_until"] <= now
- ]
- if not unbound_and_available_proxies:
- VSC_WARN("proxy_manager", "No unbind and available proxies in pool '%s'", pool_name)
- return None
-
- proxy = random.choice(unbound_and_available_proxies)
- VSC_DEBUG("proxy_manager", "Selected unbind proxy ID %d from pool '%s'", proxy["id"], pool_name)
- return proxy
- def lock_proxy(self, pool_name: str, proxy_id: int, duration_seconds: int):
- """锁定指定代理一段时间。"""
- with self._proxy_lock:
- proxies = self._proxies.get(pool_name, [])
- for p in proxies:
- if p["id"] == proxy_id:
- p["lock_until"] = time.time() + duration_seconds
- VSC_INFO("proxy_manager", "Locked proxy ID %d in pool '%s' until %s",
- proxy_id, pool_name, time.ctime(p["lock_until"]))
- return
- VSC_WARN("proxy_manager", "Proxy ID %d not found in pool '%s' for locking", proxy_id, pool_name)
|