group_coordinator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. # group_coordinator.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from typing import List, Optional
  8. from concurrent.futures import ThreadPoolExecutor, wait
  9. # 导入所有依赖
  10. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task # type: ignore
  11. from vs_plg import IVSPlg # type: ignore
  12. from vs_plg_factory import VSPlgFactory # type: ignore
  13. from toolkit.account_manager import AccountManager # type: ignore
  14. from toolkit.proxy_manager import ProxyManager # type: ignore
  15. from toolkit.binding_manager import BindingManager # type: ignore
  16. from toolkit.thread_pool import ThreadPool # type: ignore
  17. from toolkit.vs_cloud_api import VSCloudApi
  18. from vs_log_macros import VSC_INFO, VSC_DEBUG, VSC_WARN, VSC_ERROR # type: ignore
  19. class GroupCoordinator:
  20. """
  21. @brief GroupCoordinator 类
  22. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  23. 任务调度、查询和预订流程。
  24. """
  25. def __init__(self, cfg: GroupConfig):
  26. self.m_cfg = cfg
  27. self.m_factory = VSPlgFactory() # 插件工厂实例
  28. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  29. self.m_stop_event = threading.Event() # 用于停止线程
  30. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  31. self.m_monitor_thread: Optional[threading.Thread] = None
  32. self.m_creator_thread: Optional[threading.Thread] = None
  33. # 预订操作的线程池,独立于任务调度
  34. self.book_executor = ThreadPool(max_workers=5).getInstance() # 使用我们封装的ThreadPool
  35. VSC_INFO("coordinator", "GroupCoordinator for '%s' initialized.", self.m_cfg.identifier)
  36. def set_push_callback(self, cb):
  37. """
  38. @brief 设置推送回调函数 (C++中的PushCallback)
  39. Python中可以直接传递可调用对象。
  40. """
  41. self.push_callback_ = cb
  42. VSC_INFO("coordinator", "Push callback set for group '%s'.", self.m_cfg.identifier)
  43. def start(self):
  44. """
  45. @brief 启动协调器,包括插件注册和线程启动。
  46. """
  47. if not self.m_cfg.enable:
  48. VSC_WARN("coordinator", "Group '%s' is disabled, not starting.", self.m_cfg.identifier)
  49. return
  50. VSC_INFO("coordinator", "Starting coordinator for group '%s'...", self.m_cfg.identifier)
  51. self.m_stop_event.clear()
  52. # 注册插件
  53. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  54. # === 修复点:更智能的类名推导逻辑 ===
  55. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  56. plugin_name = self.m_cfg.plugin_config.plugin_name
  57. class_name = "".join(part.title() for part in plugin_name.split('_'))
  58. # 调试日志:确认推导出的类名
  59. VSC_DEBUG("coordinator", "Inferring class name for plugin '%s': '%s'", plugin_name, class_name)
  60. self.m_factory.register_plugin(plugin_name,
  61. plugin_module_path,
  62. class_name)
  63. self.m_monitor_thread = threading.Thread(target=self.monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  64. self.m_creator_thread = threading.Thread(target=self.creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  65. self.m_monitor_thread.start()
  66. self.m_creator_thread.start()
  67. VSC_INFO("coordinator", "Coordinator for group '%s' threads started.", self.m_cfg.identifier)
  68. def stop(self):
  69. """
  70. @brief 停止协调器,等待所有线程结束。
  71. """
  72. VSC_INFO("coordinator", "Stopping coordinator for group '%s'...", self.m_cfg.identifier)
  73. self.m_stop_event.set() # 发送停止信号
  74. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  75. self.m_monitor_thread.join()
  76. if self.m_creator_thread and self.m_creator_thread.is_alive():
  77. self.m_creator_thread.join()
  78. # 关闭预订线程池
  79. self.book_executor.shutdown(wait=True)
  80. VSC_INFO("coordinator", "Coordinator for group '%s' stopped.", self.m_cfg.identifier)
  81. def restart(self):
  82. """
  83. @brief 重启协调器。
  84. """
  85. VSC_INFO("coordinator", "Restarting coordinator for group '%s'...", self.m_cfg.identifier)
  86. self.stop()
  87. self.start()
  88. VSC_INFO("coordinator", "Coordinator for group '%s' restarted.", self.m_cfg.identifier)
  89. def group_id(self) -> str:
  90. """
  91. @brief 获取分组ID。
  92. """
  93. return self.m_cfg.identifier
  94. def monitor_loop(self):
  95. """
  96. @brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。
  97. """
  98. VSC_INFO("coordinator", "[START] monitor loop starting for group %s", self.m_cfg.identifier)
  99. rng = random.Random()
  100. while not self.m_stop_event.is_set():
  101. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  102. time.sleep(sleep_ms)
  103. now = time.time()
  104. # 拷贝任务列表
  105. tasks_to_process = []
  106. with self.m_lock:
  107. tasks_to_process = list(self.m_tasks)
  108. for task in tasks_to_process:
  109. with self.m_lock:
  110. if task not in self.m_tasks:
  111. continue
  112. if not task.instance.health_check():
  113. continue
  114. if now < task.next_run:
  115. continue
  116. # 执行查询
  117. is_booking_triggered = False
  118. try:
  119. result = task.instance.query()
  120. if result.success:
  121. # === 关键修改:on_query_result 现在会阻塞直到抢票结束 ===
  122. self.on_query_result(task.instance, result)
  123. is_booking_triggered = True
  124. else:
  125. error = task.instance.get_last_error()
  126. if error.error_code != 2001: # 忽略 No availability found
  127. VSC_DEBUG("coordinator", "[%s] Query failed, code=%d, msg=%s",
  128. self.m_cfg.identifier, error.error_code, error.error_message)
  129. except Exception as e:
  130. VSC_ERROR("coordinator", "[%s] Exception during query: %s", self.m_cfg.identifier, str(e))
  131. # 计算下次运行时间
  132. # 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬
  133. if is_booking_triggered:
  134. interval = rng.randint(30, 60) # 抢完票休息 30-60 秒
  135. VSC_INFO("coordinator", "[%s] Booking attempted, entering cooldown for %d sec.", self.m_cfg.identifier, interval)
  136. else:
  137. interval = 30
  138. mode = task.qw_cfg.mode
  139. if mode == QueryWaitMode.Loop:
  140. interval = 0
  141. elif mode == QueryWaitMode.Fixed:
  142. interval = task.qw_cfg.fixed_wait
  143. elif mode == QueryWaitMode.Random:
  144. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  145. task.next_run = time.time() + interval
  146. # 清理不健康实例
  147. with self.m_lock:
  148. initial_size = len(self.m_tasks)
  149. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  150. if len(self.m_tasks) < initial_size:
  151. VSC_WARN("coordinator", "[%s] Removed %d unhealthy instance(s). Remaining: %d",
  152. self.m_cfg.identifier, initial_size - len(self.m_tasks), len(self.m_tasks))
  153. VSC_INFO("coordinator", "[STOP] monitor loop exiting for group %s", self.m_cfg.identifier)
  154. def creator_loop(self):
  155. """
  156. @brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。
  157. """
  158. VSC_INFO("coordinator", "[START] creator loop starting for group %s", self.m_cfg.identifier)
  159. while not self.m_stop_event.is_set():
  160. time.sleep(0.1) # 避免空转太快
  161. diff = 0
  162. with self.m_lock:
  163. current_instances_count = len(self.m_tasks)
  164. diff = self.m_cfg.target_instances - current_instances_count
  165. if diff > 0:
  166. VSC_INFO("coordinator", "[%s] Need to create %d new instance(s). Current: %d, Target: %d",
  167. self.m_cfg.identifier, diff, current_instances_count, self.m_cfg.target_instances)
  168. # 准备配置
  169. plg_cfg = self._make_plg_config()
  170. if not plg_cfg:
  171. VSC_WARN("coordinator", "[%s] Failed to prepare plugin configuration, sleeping 30s.", self.m_cfg.identifier)
  172. time.sleep(30) # 等待资源 (账户/代理) 恢复
  173. continue
  174. # 在线程池中创建实例,模拟C++的异步创建
  175. future = ThreadPool.getInstance().enqueue(self._create_instance, plg_cfg)
  176. inst = future.result() # 等待创建完成
  177. if inst:
  178. with self.m_lock:
  179. # 确保在添加到任务列表之前,实例数量仍然低于目标值
  180. if len(self.m_tasks) < self.m_cfg.target_instances:
  181. new_task = Task(
  182. instance=inst,
  183. qw_cfg=self.m_cfg.query_wait,
  184. next_run=time.time() # 立即执行第一次查询
  185. )
  186. self.m_tasks.append(new_task)
  187. VSC_INFO("coordinator", "[%s] New instance added. Total instances: %d",
  188. self.m_cfg.identifier, len(self.m_tasks))
  189. else:
  190. VSC_DEBUG("coordinator", "[%s] Target instances already met, discarding newly created instance.", self.m_cfg.identifier)
  191. else:
  192. VSC_WARN("coordinator", "[%s] Failed to create plugin instance.", self.m_cfg.identifier)
  193. # 可以在这里添加重试逻辑或错误处理
  194. # 模拟创建间隔,避免瞬间创建过多实例
  195. time.sleep(random.uniform(1.0, 5.0))
  196. VSC_INFO("coordinator", "[STOP] creator loop exiting for group %s", self.m_cfg.identifier)
  197. def _make_plg_config(self) -> Optional[VSPlgConfig]:
  198. """
  199. @brief 准备插件配置 (账号、代理等)。
  200. """
  201. VSC_DEBUG("coordinator", "[%s] Preparing plugin configuration...", self.m_cfg.identifier)
  202. plg_cfg = VSPlgConfig()
  203. # 账号配置
  204. if self.m_cfg.need_account:
  205. account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool)
  206. if not account:
  207. VSC_WARN("coordinator", "[%s] No available accounts for pool '%s'", self.m_cfg.identifier, self.m_cfg.account_pool)
  208. return None
  209. plg_cfg.account.id = account["id"]
  210. plg_cfg.account.username = account["username"]
  211. plg_cfg.account.password = account["password"]
  212. plg_cfg.account.lock_until = account.get("lock_until", "")
  213. VSC_DEBUG("coordinator", "[%s] Using account ID %d, username %s", self.m_cfg.identifier, plg_cfg.account.id, plg_cfg.account.username)
  214. # 代理配置
  215. if self.m_cfg.need_proxy:
  216. proxy = None
  217. if self.m_cfg.need_ip_bind:
  218. proxy_id = BindingManager.Instance().get_bounded_proxy_id(self.m_cfg.account_pool, plg_cfg.account.id)
  219. if proxy_id is None: # 没有绑定代理,需要获取一个新的并绑定
  220. bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool)
  221. proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids)
  222. if not proxy:
  223. VSC_WARN("coordinator", "[%s] No available unbind proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool)
  224. return None
  225. BindingManager.Instance().create_binding(
  226. self.m_cfg.account_pool, plg_cfg.account.id,
  227. self.m_cfg.proxy_pool, proxy["id"], "dynamic")
  228. VSC_INFO("coordinator", "[%s] Created dynamic binding: account %d -> proxy %d",
  229. self.m_cfg.identifier, plg_cfg.account.id, proxy["id"])
  230. else: # 已经有绑定代理,直接获取
  231. all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, [])
  232. proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None)
  233. if not proxy:
  234. VSC_ERROR("coordinator", "[%s] Bounded proxy ID %d not found in pool %s", self.m_cfg.identifier, proxy_id, self.m_cfg.proxy_pool)
  235. return None
  236. else:
  237. proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool)
  238. if not proxy:
  239. VSC_WARN("coordinator", "[%s] No available proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool)
  240. return None
  241. plg_cfg.proxy.id = proxy["id"]
  242. plg_cfg.proxy.ip = proxy["ip"]
  243. plg_cfg.proxy.port = proxy["port"]
  244. plg_cfg.proxy.scheme = proxy["scheme"]
  245. plg_cfg.proxy.username = proxy.get("username", "")
  246. plg_cfg.proxy.password = proxy.get("password", "")
  247. plg_cfg.proxy.lock_until = proxy.get("lock_until", "")
  248. VSC_DEBUG("coordinator", "[%s] Using proxy ID %d, IP %s:%d",
  249. self.m_cfg.identifier, plg_cfg.proxy.id, plg_cfg.proxy.ip, plg_cfg.proxy.port)
  250. plg_cfg.free_config = self.m_cfg.free_config
  251. VSC_DEBUG("coordinator", "[%s] Plugin configuration prepared.", self.m_cfg.identifier)
  252. return plg_cfg
  253. def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]:
  254. """
  255. @brief 创建并初始化单个插件实例。
  256. 这个方法在 creator_loop 的线程池中执行。
  257. """
  258. VSC_DEBUG("coordinator", "[%s] Creating plugin instance (plugin=%s)...", self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  259. try:
  260. inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  261. inst.set_config(plg_cfg)
  262. success = inst.create_session()
  263. # 无论成功失败都锁定账号
  264. if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0:
  265. AccountManager.Instance().lock_account(
  266. self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60)
  267. if not success:
  268. error = inst.get_last_error()
  269. VSC_ERROR("coordinator", "[%s] Create session failed, code=%d, msg=%s",
  270. self.m_cfg.identifier, error.error_code, error.error_message)
  271. return None
  272. VSC_INFO("coordinator", "[%s] Plugin instance created and session established.", self.m_cfg.identifier)
  273. return inst
  274. except Exception as e:
  275. VSC_ERROR("coordinator", "[%s] Error creating plugin instance: %s", self.m_cfg.identifier, str(e))
  276. return None
  277. def on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult):
  278. VSC_INFO("coordinator", "[%s] Query result received: %s. BLOCKING monitor loop for booking...",
  279. self.m_cfg.identifier, str(query_result))
  280. # 定义内部预订任务
  281. def book_task(inst: IVSPlg, result: VSQueryResult):
  282. task_id = None
  283. try:
  284. # 1. 获取对应的用户任务 (Pop Task)
  285. booking_routing_key = f'auto.{result.routing_key}' if result.routing_key else "default"
  286. # 尝试获取任务
  287. task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  288. if not task:
  289. VSC_WARN("coordinator", "[%s] No pending user tasks found for key '%s'. Abandoning slot.",
  290. inst.get_group_id(), booking_routing_key)
  291. return
  292. task_id = task['id']
  293. user_input = task.get('user_inputs', {})
  294. VSC_INFO("coordinator", "[%s] Picked up Task ID %s for booking...", inst.get_group_id(), task_id)
  295. # 2. 执行预订
  296. # 注意:插件的 book 方法需要接收 user_input
  297. book_res = inst.book(result, user_input)
  298. # 3. 处理结果
  299. if book_res.success:
  300. VSC_INFO("coordinator", "[%s] Booking SUCCESS! Order: '%s'",
  301. inst.get_group_id(), book_res.order_id)
  302. # 推送通知
  303. if hasattr(self, 'push_callback_') and self.push_callback_:
  304. self.push_callback_(100, f"Booking Success: {book_res.order_id}".encode('utf-8'), 0)
  305. # 4. 成功逻辑:更新任务状态为 grabbed
  306. # 构造历史记录详情对象
  307. history_detail = {
  308. "slot_date": book_res.book_date,
  309. "slot_time": book_res.book_time,
  310. "account": inst.config.account.username if inst.config else "unknown",
  311. "timestamp": int(time.time()),
  312. "payment_link": book_res.payment_link,
  313. "order_id": book_res.order_id
  314. }
  315. update_data = {
  316. "status": "grabbed",
  317. # 修改点:grabbed_history 是一个字符串列表
  318. # 我们将详情对象序列化为 JSON 字符串放入列表中
  319. "grabbed_history": [json.dumps(history_detail)]
  320. }
  321. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  322. VSC_INFO("coordinator", "[%s] Task %s marked as GRABBED.", inst.get_group_id(), task_id)
  323. # 成功后 task_id 置空,防止 finally 块再次将其重置为 pending
  324. task_id = None
  325. else:
  326. # 失败逻辑
  327. error = inst.get_last_error()
  328. VSC_ERROR("coordinator", "[%s] Booking FAILED. Code=%d, Msg=%s",
  329. inst.get_group_id(), error.error_code, error.error_message)
  330. # task_id 仍然存在,将由 finally 块处理回滚
  331. except Exception as e:
  332. VSC_ERROR("coordinator", "[%s] Exception during booking: %s", inst.get_group_id(), str(e))
  333. finally:
  334. # 5. Return to Queue (回滚机制)
  335. if task_id is not None:
  336. VSC_WARN("coordinator", "[%s] Returning Task %s to queue (status=pending).", inst.get_group_id(), task_id)
  337. try:
  338. VSCloudApi.Instance().update_vas_task(task_id, {"status": "pending"})
  339. except Exception as ex:
  340. VSC_ERROR("coordinator", "[%s] Failed to return task to queue: %s", inst.get_group_id(), str(ex))
  341. futures = []
  342. f = self.book_executor.enqueue(book_task, sptr, query_result)
  343. futures.append(f)
  344. wait(futures)