import os import time import json import random import threading import redis from typing import List, Dict, Callable from vs_types import GroupConfig, VSPlgConfig, Task, QueryWaitMode from vs_plg_factory import VSPlgFactory from toolkit.thread_pool import ThreadPool from toolkit.vs_cloud_api import VSCloudApi from toolkit.proxy_manager import ProxyManager from toolkit.backoff import ExponentialBackoff class SentinelGCO: def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None): self.m_cfg = cfg self.m_factory = VSPlgFactory() self.m_logger = logger self.m_tasks: List[Task] = [] self.m_lock = threading.RLock() self.m_stop_event = threading.Event() self.redis_client = redis.Redis(**redis_conf) self.m_pending_builtin = 0 # 1. 全局建连退避:起步 1 分钟,封顶 1 小时 (保护登录接口) self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=3600.0, factor=2.0) def _log(self, message): if self.m_logger: self.m_logger(f'[SENTINEL] [{self.m_cfg.identifier}] {message}') def start(self): if not self.m_cfg.enable: return self._log("Starting Sentinel...") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin) self.m_factory.register_plugin(plugin_name, plugin_path, class_name) threading.Thread(target=self._monitor_loop, daemon=True, name="Sentinel-Monitor").start() threading.Thread(target=self._creator_loop, daemon=True, name="Sentinel-Creator").start() def stop(self): self._log("Stopping Sentinel...") self.m_stop_event.set() def _get_redis_key(self, routing_key: str) -> str: return f"vs:signal:{routing_key}" def _monitor_loop(self): self._log("Monitor loop started.") rng = random.Random() while not self.m_stop_event.is_set(): try: time.sleep(0.5) now = time.time() with self.m_lock: active_tasks = [t for t in self.m_tasks if t.instance.health_check()] self.m_tasks = active_tasks for task in active_tasks: if now < task.next_run: continue apt_types = self.m_cfg.appointment_types if not apt_types: continue weights = [float(item.weight) for item in apt_types] apt_type = random.choices(apt_types, weights=weights, k=1)[0] try: VSCloudApi.Instance().slot_refresh_start(apt_type.routing_key, country=apt_type.country, city=apt_type.city, visa_type=apt_type.visa_type) result = task.instance.query(apt_type) result.apt_type = apt_type VSCloudApi.Instance().slot_refresh_success(apt_type.routing_key) if result.success: ttl = self.m_cfg.sentinel.signal_ttl self._log(f"🔥 SLOT FOUND! Writing signal to Redis (TTL: {ttl}s)") payload = { "group_id": self.m_cfg.identifier, "apt_type": apt_type.model_dump(), "query_result": result.to_snapshot_payload(), "timestamp": now } redis_key = self._get_redis_key(apt_type.routing_key) self.redis_client.setex(redis_key, ttl, json.dumps(payload)) payload["query_result"]["website"] = self.m_cfg.website VSCloudApi.Instance().slot_snapshot_report(payload["query_result"]) interval = 30 mode = task.qw_cfg.mode if mode == QueryWaitMode.Loop: interval = 1 elif mode == QueryWaitMode.Fixed: interval = task.qw_cfg.fixed_wait elif mode == QueryWaitMode.Random: interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max) task.next_run = time.time() + interval except Exception as e: self._log(f"Query exception: {e}") VSCloudApi.Instance().slot_refresh_fail(apt_type.routing_key, error=str(e)) task.next_run = time.time() + 10 except Exception as e: self._log(f"Monitor loop error: {e}") time.sleep(2) def _creator_loop(self): self._log("Creator loop started.") group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}" while not self.m_stop_event.is_set(): time.sleep(2) with self.m_lock: if self.redis_client.exists(group_cd_key): continue current = len(self.m_tasks) pending = self.m_pending_builtin if (current + pending) < self.m_cfg.sentinel.target_instances: self._spawn_sentinel_worker() def _spawn_sentinel_worker(self): with self.m_lock: self.m_pending_builtin += 1 def _job(): try: plg_cfg = VSPlgConfig() plg_cfg.debug = self.m_cfg.debug plg_cfg.free_config = self.m_cfg.free_config plg_cfg.session_max_life = self.m_cfg.session_max_life if not self.m_cfg.need_account: plg_cfg.account.id = 0 plg_cfg.account.username = "Guest" else: acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd * 60) if not acc: return plg_cfg.account.id = acc['id'] plg_cfg.account.username = acc['username'] plg_cfg.account.password = acc['password'] if self.m_cfg.need_proxy: proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval) if not proxy: return plg_cfg.proxy.id = proxy['id'] plg_cfg.proxy.ip = proxy['ip'] plg_cfg.proxy.port = proxy['port'] plg_cfg.proxy.scheme = proxy['scheme'] plg_cfg.proxy.username = proxy['username'] plg_cfg.proxy.password = proxy['password'] instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) instance.set_log(self.m_logger) instance.set_config(plg_cfg) instance.create_session() with self.m_lock: self.m_tasks.append(Task(instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time())) group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}" self.redis_client.delete(group_fail_key) self._log(f"+++ Sentinel spawned: {plg_cfg.account.username}") except Exception as e: err_str = str(e) account_not_found_indicators = [ "40401" in err_str, "Account not found" in err_str ] if any(account_not_found_indicators): return self._log(f"Spawn failed: {e}") rate_limited_indicators = [ "42901" in err_str, "Rate limited" in err_str ] if any(rate_limited_indicators): group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}" group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}" g_fails = self.redis_client.incr(group_fail_key) g_cd = self.group_backoff.calculate(g_fails) self.redis_client.set(group_cd_key, "1", ex=int(g_cd)) self._log(f"📉 [Rate Limited] Sentinel Spawn failed {g_fails} times. Global Backoff: {g_cd:.1f}s.") finally: with self.m_lock: self.m_pending_builtin = max(0, self.m_pending_builtin - 1) ThreadPool.getInstance().enqueue(_job)