sentinel.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import os
  2. import time
  3. import json
  4. import random
  5. import threading
  6. import redis
  7. from typing import List, Dict, Callable
  8. from vs_types import GroupConfig, VSPlgConfig, Task, QueryWaitMode
  9. from vs_plg_factory import VSPlgFactory
  10. from toolkit.thread_pool import ThreadPool
  11. from toolkit.vs_cloud_api import VSCloudApi
  12. from toolkit.proxy_manager import ProxyManager
  13. class SentinelGCO:
  14. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  15. self.m_cfg = cfg
  16. self.m_factory = VSPlgFactory()
  17. self.m_logger = logger
  18. self.m_tasks: List[Task] = []
  19. self.m_lock = threading.RLock()
  20. self.m_stop_event = threading.Event()
  21. self.redis_client = redis.Redis(**redis_conf)
  22. self.m_pending_builtin = 0
  23. def _log(self, message):
  24. if self.m_logger:
  25. self.m_logger(f'[SENTINEL] [{self.m_cfg.identifier}] {message}')
  26. def start(self):
  27. if not self.m_cfg.enable:
  28. return
  29. self._log("Starting Sentinel...")
  30. plugin_name = self.m_cfg.plugin_config.plugin_name
  31. class_name = "".join(part.title() for part in plugin_name.split('_'))
  32. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  33. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  34. threading.Thread(target=self._monitor_loop, daemon=True, name="Sentinel-Monitor").start()
  35. threading.Thread(target=self._creator_loop, daemon=True, name="Sentinel-Creator").start()
  36. def stop(self):
  37. self._log("Stopping Sentinel...")
  38. self.m_stop_event.set()
  39. def _get_redis_key(self, routing_key: str) -> str:
  40. return f"vs:signal:{routing_key}"
  41. def _monitor_loop(self):
  42. self._log("Monitor loop started.")
  43. rng = random.Random()
  44. while not self.m_stop_event.is_set():
  45. try:
  46. time.sleep(0.5)
  47. now = time.time()
  48. with self.m_lock:
  49. active_tasks = [t for t in self.m_tasks if t.instance.health_check()]
  50. self.m_tasks = active_tasks
  51. for task in active_tasks:
  52. if now < task.next_run:
  53. continue
  54. apt_types = self.m_cfg.appointment_types
  55. if not apt_types:
  56. continue
  57. weights = [float(item.weight) for item in apt_types]
  58. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  59. try:
  60. VSCloudApi.Instance().slot_refresh_start(apt_type.routing_key, country=apt_type.country, city=apt_type.city, visa_type=apt_type.visa_type)
  61. result = task.instance.query(apt_type)
  62. result.apt_type = apt_type
  63. VSCloudApi.Instance().slot_refresh_success(apt_type.routing_key)
  64. if result.success:
  65. ttl = self.m_cfg.sentinel.signal_ttl
  66. self._log(f"🔥 SLOT FOUND! Writing signal to Redis (TTL: {ttl}s)")
  67. payload = {
  68. "group_id": self.m_cfg.identifier,
  69. "apt_type": apt_type.model_dump(),
  70. "query_result": result.to_snapshot_payload(),
  71. "timestamp": now
  72. }
  73. redis_key = self._get_redis_key(apt_type.routing_key)
  74. self.redis_client.setex(redis_key, ttl, json.dumps(payload))
  75. payload["query_result"]["website"] = self.m_cfg.website
  76. VSCloudApi.Instance().slot_snapshot_report(payload["query_result"])
  77. interval = 30
  78. mode = task.qw_cfg.mode
  79. if mode == QueryWaitMode.Loop:
  80. interval = 1
  81. elif mode == QueryWaitMode.Fixed:
  82. interval = task.qw_cfg.fixed_wait
  83. elif mode == QueryWaitMode.Random:
  84. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  85. task.next_run = time.time() + interval
  86. except Exception as e:
  87. self._log(f"Query exception: {e}")
  88. VSCloudApi.Instance().slot_refresh_fail(apt_type.routing_key, error=str(e))
  89. task.next_run = time.time() + 10
  90. except Exception as e:
  91. self._log(f"Monitor loop error: {e}")
  92. time.sleep(2)
  93. def _creator_loop(self):
  94. self._log("Creator loop started.")
  95. while not self.m_stop_event.is_set():
  96. time.sleep(2)
  97. with self.m_lock:
  98. current = len(self.m_tasks)
  99. pending = self.m_pending_builtin
  100. if (current + pending) < self.m_cfg.sentinel.target_instances:
  101. self._spawn_sentinel_worker()
  102. def _spawn_sentinel_worker(self):
  103. with self.m_lock: self.m_pending_builtin += 1
  104. def _job():
  105. try:
  106. plg_cfg = VSPlgConfig()
  107. plg_cfg.debug = self.m_cfg.debug
  108. plg_cfg.free_config = self.m_cfg.free_config
  109. plg_cfg.session_max_life = self.m_cfg.session_max_life
  110. if not self.m_cfg.need_account:
  111. plg_cfg.account.id = 0
  112. plg_cfg.account.username = "Guest"
  113. else:
  114. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd * 60)
  115. if not acc: return
  116. plg_cfg.account.id = acc['id']
  117. plg_cfg.account.username = acc['username']
  118. plg_cfg.account.password = acc['password']
  119. if self.m_cfg.need_proxy:
  120. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval)
  121. if not proxy:
  122. return
  123. plg_cfg.proxy.id = proxy['id']
  124. plg_cfg.proxy.ip = proxy['ip']
  125. plg_cfg.proxy.port = proxy['port']
  126. plg_cfg.proxy.scheme = proxy['scheme']
  127. plg_cfg.proxy.username = proxy['username']
  128. plg_cfg.proxy.password = proxy['password']
  129. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  130. instance.set_log(self.m_logger)
  131. instance.set_config(plg_cfg)
  132. instance.create_session()
  133. with self.m_lock:
  134. self.m_tasks.append(Task(instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time()))
  135. self._log(f"+++ Sentinel spawned: {plg_cfg.account.username}")
  136. except Exception as e:
  137. err_str = str(e)
  138. if "40401" in err_str or "Account not found" in err_str:
  139. return
  140. self._log(f"Sentinel spawn failed: {e}")
  141. finally:
  142. with self.m_lock:
  143. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  144. ThreadPool.getInstance().enqueue(_job)