sentinel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. import os
  2. import time
  3. import json
  4. import random
  5. import threading
  6. import redis
  7. from typing import List, Dict, Callable
  8. from vs_types import GroupConfig, VSPlgConfig, Task, QueryWaitMode
  9. from vs_plg_factory import VSPlgFactory
  10. from toolkit.thread_pool import ThreadPool
  11. from toolkit.vs_cloud_api import VSCloudApi
  12. from toolkit.backoff import ExponentialBackoff
  13. class SentinelGCO:
  14. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  15. self.m_cfg = cfg
  16. self.m_factory = VSPlgFactory()
  17. self.m_logger = logger
  18. self.m_tasks: List[Task] = []
  19. self.m_lock = threading.RLock()
  20. self.m_stop_event = threading.Event()
  21. self.redis_com = redis.Redis(**redis_conf)
  22. self.redis_pub = redis.Redis(**redis_conf)
  23. self.m_pending_builtin = 0
  24. # 1. 全局建连退避:起步 1 分钟,封顶 1 小时 (保护登录接口)
  25. self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=3600.0, factor=2.0)
  26. self.m_last_spawn_time = 0.0
  27. def _log(self, message):
  28. if self.m_logger:
  29. self.m_logger(f'[SENTINEL] [{self.m_cfg.identifier}] {message}')
  30. def _get_average_interval(self) -> float:
  31. """计算当前组平均的查询间隔(秒)"""
  32. mode = self.m_cfg.query_wait.mode
  33. if mode == QueryWaitMode.Loop:
  34. return 1.0
  35. elif mode == QueryWaitMode.Fixed:
  36. return float(self.m_cfg.query_wait.fixed_wait)
  37. elif mode == QueryWaitMode.Random:
  38. return (self.m_cfg.query_wait.random_min + self.m_cfg.query_wait.random_max) / 2.0
  39. return 30.0
  40. def start(self):
  41. if not self.m_cfg.enable:
  42. return
  43. self._log("Starting Sentinel...")
  44. plugin_name = self.m_cfg.plugin_config.plugin_name
  45. class_name = "".join(part.title() for part in plugin_name.split('_'))
  46. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  47. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  48. threading.Thread(target=self._monitor_loop, daemon=True, name="Sentinel-Monitor").start()
  49. threading.Thread(target=self._creator_loop, daemon=True, name="Sentinel-Creator").start()
  50. def stop(self):
  51. self._log("Stopping Sentinel...")
  52. self.m_stop_event.set()
  53. with self.m_lock:
  54. tasks_to_cleanup = list(self.m_tasks)
  55. self.m_tasks.clear()
  56. for task in tasks_to_cleanup:
  57. self._cleanup_task(task, "sentinel stopped")
  58. def _cleanup_task(self, task: Task, reason: str):
  59. try:
  60. if task and task.instance and hasattr(task.instance, "cleanup"):
  61. self._log(f"Cleaning up sentinel instance. reason={reason}")
  62. task.instance.cleanup()
  63. except Exception as e:
  64. self._log(f"Cleanup failed. reason={reason}, error={e}")
  65. def _remove_task(self, task: Task, reason: str):
  66. removed = False
  67. with self.m_lock:
  68. if task in self.m_tasks:
  69. self.m_tasks.remove(task)
  70. removed = True
  71. if removed:
  72. self._cleanup_task(task, reason)
  73. def _get_redis_key(self, routing_key: str) -> str:
  74. return f"vs:signal:{routing_key}"
  75. def _monitor_loop(self):
  76. self._log("Monitor loop started.")
  77. rng = random.Random()
  78. while not self.m_stop_event.is_set():
  79. try:
  80. time.sleep(0.5)
  81. now = time.time()
  82. with self.m_lock:
  83. tasks_to_check = list(self.m_tasks)
  84. active_tasks = []
  85. dead_tasks = []
  86. for t in tasks_to_check:
  87. try:
  88. if t.instance.health_check():
  89. active_tasks.append(t)
  90. else:
  91. dead_tasks.append(t)
  92. except Exception as e:
  93. dead_tasks.append(t)
  94. self._log(f"Health check failed: {e}")
  95. if dead_tasks:
  96. with self.m_lock:
  97. current_tasks = list(self.m_tasks)
  98. self.m_tasks = [t for t in self.m_tasks if t in active_tasks]
  99. for t in dead_tasks:
  100. if t in current_tasks:
  101. self._cleanup_task(t, "health check failed")
  102. else:
  103. with self.m_lock:
  104. self.m_tasks = [t for t in self.m_tasks if t in active_tasks]
  105. for task in active_tasks:
  106. if now < task.next_run:
  107. continue
  108. apt_types = self.m_cfg.appointment_types
  109. if not apt_types:
  110. continue
  111. weights = [float(item.weight) for item in apt_types]
  112. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  113. interval = 30
  114. mode = task.qw_cfg.mode
  115. if mode == QueryWaitMode.Loop:
  116. interval = 1
  117. elif mode == QueryWaitMode.Fixed:
  118. interval = task.qw_cfg.fixed_wait
  119. elif mode == QueryWaitMode.Random:
  120. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  121. task.next_run = time.time() + interval
  122. try:
  123. VSCloudApi.Instance().slot_refresh_start(apt_type.routing_key, country=apt_type.country, city=apt_type.city, visa_type=apt_type.visa_type)
  124. result = task.instance.query(apt_type)
  125. result.apt_type = apt_type
  126. if result.success:
  127. ttl = self.m_cfg.sentinel.signal_ttl
  128. self._log(f"🔥 SLOT FOUND! Writing signal to Redis (TTL: {ttl}s)")
  129. payload = {
  130. "group_id": self.m_cfg.identifier,
  131. "apt_type": apt_type.model_dump(),
  132. "query_result": result.to_snapshot_payload(),
  133. "timestamp": now
  134. }
  135. redis_key = self._get_redis_key(apt_type.routing_key)
  136. self.redis_pub.publish(redis_key, json.dumps(payload))
  137. payload["query_result"]["website"] = self.m_cfg.website
  138. VSCloudApi.Instance().slot_snapshot_report(payload["query_result"])
  139. VSCloudApi.Instance().slot_refresh_success(apt_type.routing_key)
  140. except Exception as e:
  141. self._log(f"Query exception: {e}")
  142. VSCloudApi.Instance().slot_refresh_fail(apt_type.routing_key, error=str(e))
  143. except Exception as e:
  144. self._log(f"Monitor loop error: {e}")
  145. time.sleep(2)
  146. def _creator_loop(self):
  147. self._log("Creator loop started.")
  148. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  149. while not self.m_stop_event.is_set():
  150. time.sleep(2)
  151. with self.m_lock:
  152. if self.redis_com.exists(group_cd_key):
  153. continue
  154. current = len(self.m_tasks)
  155. pending = self.m_pending_builtin
  156. target = self.m_cfg.sentinel.target_instances
  157. if (current + pending) < target:
  158. now = time.time()
  159. avg_interval = self._get_average_interval()
  160. stagger_delay = avg_interval / max(1, target)
  161. stagger_delay = max(10.0, stagger_delay)
  162. if now - self.m_last_spawn_time >= stagger_delay:
  163. with self.m_lock:
  164. self.m_last_spawn_time = now
  165. self._log(f"Staggered spawn triggered. Next spawn in {stagger_delay:.1f}s")
  166. self._spawn_sentinel_worker()
  167. def _spawn_sentinel_worker(self):
  168. with self.m_lock:
  169. self.m_pending_builtin += 1
  170. def _job():
  171. instance = None
  172. success = False
  173. try:
  174. plg_cfg = VSPlgConfig()
  175. plg_cfg.debug = self.m_cfg.debug
  176. plg_cfg.free_config = self.m_cfg.free_config
  177. plg_cfg.session_max_life = self.m_cfg.session_max_life
  178. if not self.m_cfg.need_account:
  179. plg_cfg.account.id = 0
  180. plg_cfg.account.username = "Guest"
  181. else:
  182. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd)
  183. plg_cfg.account.id = acc['id']
  184. plg_cfg.account.username = acc['username']
  185. plg_cfg.account.password = acc['password']
  186. if self.m_cfg.need_proxy:
  187. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  188. plg_cfg.proxy.id = proxy['id']
  189. plg_cfg.proxy.ip = proxy['ip']
  190. plg_cfg.proxy.port = proxy['port']
  191. plg_cfg.proxy.proto = proxy['proto']
  192. plg_cfg.proxy.username = proxy['username']
  193. plg_cfg.proxy.password = proxy['password']
  194. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  195. instance.set_log(self.m_logger)
  196. instance.set_config(plg_cfg)
  197. instance.create_session()
  198. with self.m_lock:
  199. self.m_tasks.append(
  200. Task(instance=instance,qw_cfg=self.m_cfg.query_wait,next_run=time.time(), book_allowed=False))
  201. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  202. self.redis_com.delete(group_fail_key)
  203. success = True
  204. self._log(f"+++ Sentinel spawned: {plg_cfg.account.username}")
  205. except Exception as e:
  206. err_str = str(e)
  207. resource_not_found_indicators = [
  208. "40401" in err_str,
  209. "Account not found" in err_str,
  210. "Proxy not found" in err_str,
  211. ]
  212. if any(resource_not_found_indicators):
  213. return
  214. self._log(f"Spawn failed: {e}")
  215. rate_limited_indicators = [
  216. "42901" in err_str,
  217. "Rate limited" in err_str
  218. ]
  219. if any(rate_limited_indicators):
  220. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  221. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  222. g_fails = self.redis_com.incr(group_fail_key)
  223. g_cd = self.group_backoff.calculate(g_fails)
  224. self.redis_com.set(group_cd_key, "1", ex=int(g_cd))
  225. self._log(f"📉 [Rate Limited] Sentinel Spawn failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
  226. finally:
  227. if not success and instance is not None:
  228. try:
  229. if hasattr(instance, "cleanup"):
  230. instance.cleanup()
  231. except Exception as e:
  232. self._log(f"Cleanup failed after spawn failure: {e}")
  233. with self.m_lock:
  234. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  235. ThreadPool.getInstance().enqueue(_job)