| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- import os
- import time
- import json
- import random
- import threading
- import redis
- from typing import List, Dict, Callable
- from vs_types import GroupConfig, VSPlgConfig, Task, QueryWaitMode
- from vs_plg_factory import VSPlgFactory
- from toolkit.thread_pool import ThreadPool
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.backoff import ExponentialBackoff
- class SentinelGCO:
- def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
- self.m_cfg = cfg
- self.m_factory = VSPlgFactory()
- self.m_logger = logger
- self.m_tasks: List[Task] = []
- self.m_lock = threading.RLock()
- self.m_stop_event = threading.Event()
-
- self.redis_client = redis.Redis(**redis_conf)
- self.m_pending_builtin = 0
-
- # 1. 全局建连退避:起步 1 分钟,封顶 1 小时 (保护登录接口)
- self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=3600.0, factor=2.0)
- self.m_last_spawn_time = 0.0
- self.m_last_group_query_time = 0.0
- def _log(self, message):
- if self.m_logger:
- self.m_logger(f'[SENTINEL] [{self.m_cfg.identifier}] {message}')
-
- def _get_average_interval(self) -> float:
- """计算当前组平均的查询间隔(秒)"""
- mode = self.m_cfg.query_wait.mode
- if mode == QueryWaitMode.Loop:
- return 1.0
- elif mode == QueryWaitMode.Fixed:
- return float(self.m_cfg.query_wait.fixed_wait)
- elif mode == QueryWaitMode.Random:
- return (self.m_cfg.query_wait.random_min + self.m_cfg.query_wait.random_max) / 2.0
- return 30.0
- def start(self):
- if not self.m_cfg.enable:
- return
- self._log("Starting Sentinel...")
- plugin_name = self.m_cfg.plugin_config.plugin_name
- class_name = "".join(part.title() for part in plugin_name.split('_'))
- plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
- self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
- threading.Thread(target=self._monitor_loop, daemon=True, name="Sentinel-Monitor").start()
- threading.Thread(target=self._creator_loop, daemon=True, name="Sentinel-Creator").start()
- def stop(self):
- self._log("Stopping Sentinel...")
- self.m_stop_event.set()
- with self.m_lock:
- tasks_to_cleanup = list(self.m_tasks)
- self.m_tasks.clear()
- for task in tasks_to_cleanup:
- self._cleanup_task(task, "sentinel stopped")
- def _cleanup_task(self, task: Task, reason: str):
- try:
- if task and task.instance and hasattr(task.instance, "cleanup"):
- self._log(f"Cleaning up sentinel instance. reason={reason}")
- task.instance.cleanup()
- except Exception as e:
- self._log(f"Cleanup failed. reason={reason}, error={e}")
- def _remove_task(self, task: Task, reason: str):
- removed = False
- with self.m_lock:
- if task in self.m_tasks:
- self.m_tasks.remove(task)
- removed = True
- if removed:
- self._cleanup_task(task, reason)
- def _get_redis_key(self, routing_key: str) -> str:
- return f"vs:signal:{routing_key}"
- def _monitor_loop(self):
- self._log("Monitor loop started.")
-
- self.m_last_group_query_time = 0.0
- while not self.m_stop_event.is_set():
- try:
- time.sleep(0.5)
- now = time.time()
-
- with self.m_lock:
- tasks_to_check = list(self.m_tasks)
- active_tasks = []
- dead_tasks = []
- for t in tasks_to_check:
-
- if not t.is_querying:
- active_tasks.append(t)
- continue
-
- try:
- if t.instance.health_check():
- active_tasks.append(t)
- else:
- dead_tasks.append(t)
- except Exception as e:
- dead_tasks.append(t)
- self._log(f"Health check failed: {e}")
- if dead_tasks:
- with self.m_lock:
- current_tasks = list(self.m_tasks)
- self.m_tasks = [t for t in self.m_tasks if t in active_tasks]
- for t in dead_tasks:
- if t in current_tasks:
- self._cleanup_task(t, "health check failed")
- else:
- with self.m_lock:
- self.m_tasks = [t for t in self.m_tasks if t in active_tasks]
-
- if not active_tasks:
- continue
- avg_interval = self._get_average_interval()
- global_gap = max(1.0, avg_interval / len(active_tasks))
-
- active_tasks.sort(key=lambda x: x.next_run)
- for task in active_tasks:
- if now < task.next_run:
- continue
-
- if task.is_querying:
- continue
-
- if now - self.m_last_group_query_time < global_gap:
- break
- apt_types = self.m_cfg.appointment_types
- if not apt_types:
- continue
- weights = [float(item.weight) for item in apt_types]
- apt_type = random.choices(apt_types, weights=weights, k=1)[0]
-
- interval = 30
- mode = task.qw_cfg.mode
- if mode == QueryWaitMode.Loop:
- interval = 1
- elif mode == QueryWaitMode.Fixed:
- interval = task.qw_cfg.fixed_wait
- elif mode == QueryWaitMode.Random:
- interval = random.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
- task.is_querying = True
- self.m_last_group_query_time = now
- def _query_job(current_task=task, a_type=apt_type, wait_gap=interval):
- try:
- VSCloudApi.Instance().slot_refresh_start(a_type.routing_key, country=a_type.country, city=a_type.city, visa_type=a_type.visa_type)
- result = current_task.instance.query(a_type)
- result.apt_type = a_type
- if result.success:
- ttl = self.m_cfg.sentinel.signal_ttl
- self._log(f"🔥 SLOT FOUND! Writing signal to Redis (TTL: {ttl}s)")
- payload = {
- "group_id": self.m_cfg.identifier,
- "apt_type": a_type.model_dump(),
- "query_result": result.to_snapshot_payload(),
- "timestamp": time.time()
- }
- redis_key = self._get_redis_key(a_type.routing_key)
- self.redis_client.setex(redis_key, ttl, json.dumps(payload))
- payload["query_result"]["website"] = self.m_cfg.website
- VSCloudApi.Instance().slot_snapshot_report(payload["query_result"])
- VSCloudApi.Instance().slot_refresh_success(a_type.routing_key)
- except Exception as e:
- self._log(f"Query exception: {e}")
- VSCloudApi.Instance().slot_refresh_fail(a_type.routing_key, error=str(e))
- finally:
- current_task.next_run = time.time() + wait_gap
- current_task.is_querying = False
- ThreadPool.getInstance().enqueue(_query_job)
- break
- except Exception as e:
- self._log(f"Monitor loop error: {e}")
- time.sleep(2)
- def _creator_loop(self):
- self._log("Creator loop started.")
- group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
-
- while not self.m_stop_event.is_set():
- time.sleep(2)
- with self.m_lock:
-
- if self.redis_client.exists(group_cd_key):
- continue
-
- current = len(self.m_tasks)
- pending = self.m_pending_builtin
- target = self.m_cfg.sentinel.target_instances
-
- if (current + pending) < target:
- now = time.time()
- avg_interval = self._get_average_interval()
- stagger_delay = avg_interval / max(1, target)
- stagger_delay = max(10.0, stagger_delay)
- if now - self.m_last_spawn_time >= stagger_delay:
- with self.m_lock:
- self.m_last_spawn_time = now
- self._log(f"Staggered spawn triggered. Next spawn in {stagger_delay:.1f}s")
- self._spawn_sentinel_worker()
- def _spawn_sentinel_worker(self):
- with self.m_lock:
- self.m_pending_builtin += 1
-
- def _job():
- instance = None
- success = False
- try:
- plg_cfg = VSPlgConfig()
- plg_cfg.debug = self.m_cfg.debug
- plg_cfg.free_config = self.m_cfg.free_config
- plg_cfg.session_max_life = self.m_cfg.session_max_life
-
- if not self.m_cfg.need_account:
- plg_cfg.account.id = 0
- plg_cfg.account.username = "Guest"
- else:
- acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd)
- plg_cfg.account.id = acc['id']
- plg_cfg.account.username = acc['username']
- plg_cfg.account.password = acc['password']
-
- if self.m_cfg.need_proxy:
- proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
- plg_cfg.proxy.id = proxy['id']
- plg_cfg.proxy.ip = proxy['ip']
- plg_cfg.proxy.port = proxy['port']
- plg_cfg.proxy.proto = proxy['proto']
- plg_cfg.proxy.username = proxy['username']
- plg_cfg.proxy.password = proxy['password']
- instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
- instance.set_log(self.m_logger)
- instance.set_config(plg_cfg)
- instance.create_session()
-
- with self.m_lock:
- self.m_tasks.append(
- Task(instance=instance,qw_cfg=self.m_cfg.query_wait,next_run=time.time(), book_allowed=False))
-
- group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
- self.redis_client.delete(group_fail_key)
-
- success = True
- self._log(f"+++ Sentinel spawned: {plg_cfg.account.username}")
- except Exception as e:
- err_str = str(e)
- resource_not_found_indicators = [
- "40401" in err_str,
- "Account not found" in err_str,
- "Proxy not found" in err_str,
- ]
- if any(resource_not_found_indicators):
- return
-
- self._log(f"Spawn failed: {e}")
-
- rate_limited_indicators = [
- "42901" in err_str,
- "Rate limited" in err_str
- ]
- if any(rate_limited_indicators):
- group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
- group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
-
- g_fails = self.redis_client.incr(group_fail_key)
- g_cd = self.group_backoff.calculate(g_fails)
- self.redis_client.set(group_cd_key, "1", ex=int(g_cd))
- self._log(f"📉 [Rate Limited] Sentinel Spawn failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
-
- finally:
- if not success and instance is not None:
- try:
- if hasattr(instance, "cleanup"):
- instance.cleanup()
- except Exception as e:
- self._log(f"Cleanup failed after spawn failure: {e}")
- with self.m_lock:
- self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
- ThreadPool.getInstance().enqueue(_job)
|