gco.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from typing import List, Optional, Callable
  8. from concurrent.futures import wait
  9. # 导入所有依赖
  10. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  11. from vs_plg import IVSPlg
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.account_manager import AccountManager
  14. from toolkit.proxy_manager import ProxyManager
  15. from toolkit.binding_manager import BindingManager
  16. from toolkit.thread_pool import ThreadPool
  17. from toolkit.vs_cloud_api import VSCloudApi
  18. class GCO:
  19. """
  20. @brief GCO 类
  21. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  22. 任务调度、查询和预订流程。
  23. """
  24. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  25. self.m_cfg = cfg
  26. self.m_factory = VSPlgFactory() # 插件工厂实例
  27. self.m_logger = logger
  28. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  29. self.m_stop_event = threading.Event() # 用于停止线程
  30. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  31. self.m_monitor_thread: Optional[threading.Thread] = None
  32. self.m_creator_thread: Optional[threading.Thread] = None
  33. def start(self):
  34. """
  35. @brief 启动协调器,包括插件注册和线程启动。
  36. """
  37. if not self.m_cfg.enable:
  38. self._log("Group is disabled, not starting.")
  39. return
  40. self._log("Starting coordinator...")
  41. self.m_stop_event.clear()
  42. # 注册插件
  43. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  44. # === 修复点:更智能的类名推导逻辑 ===
  45. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  46. plugin_name = self.m_cfg.plugin_config.plugin_name
  47. class_name = "".join(part.title() for part in plugin_name.split('_'))
  48. # 调试日志:确认推导出的类名
  49. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  50. self.m_factory.register_plugin(plugin_name,
  51. plugin_module_path,
  52. class_name)
  53. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  54. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  55. self.m_monitor_thread.start()
  56. self.m_creator_thread.start()
  57. self._log("Coordinator threads started.")
  58. def stop(self):
  59. """
  60. @brief 停止协调器,等待所有线程结束。
  61. """
  62. self._log("Stopping coordinator...")
  63. self.m_stop_event.set() # 发送停止信号
  64. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  65. self.m_monitor_thread.join()
  66. if self.m_creator_thread and self.m_creator_thread.is_alive():
  67. self.m_creator_thread.join()
  68. self._log("Coordinator stopped.")
  69. def group_id(self) -> str:
  70. """
  71. @brief 获取分组ID。
  72. """
  73. return self.m_cfg.identifier
  74. def _log(self, message):
  75. if self.m_logger:
  76. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  77. def _monitor_loop(self):
  78. """
  79. @brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。
  80. """
  81. self._log("[START] monitor loop starting...")
  82. rng = random.Random()
  83. while not self.m_stop_event.is_set():
  84. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  85. time.sleep(sleep_ms)
  86. now = time.time()
  87. # 拷贝任务列表
  88. tasks_to_process = []
  89. with self.m_lock:
  90. tasks_to_process = list(self.m_tasks)
  91. for task in tasks_to_process:
  92. with self.m_lock:
  93. if task not in self.m_tasks:
  94. continue
  95. if not task.instance.health_check():
  96. continue
  97. if now < task.next_run:
  98. continue
  99. # 执行查询
  100. is_booking_triggered = False
  101. try:
  102. result = task.instance.query()
  103. if result.success:
  104. # === 关键修改:_on_query_result 现在会阻塞直到抢票结束 ===
  105. self._on_query_result(task.instance, result)
  106. is_booking_triggered = True
  107. else:
  108. self._log("Query done, No availability found")
  109. except Exception as e:
  110. self._log(f"Exception during query: {e}")
  111. # 计算下次运行时间
  112. # 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬
  113. if is_booking_triggered:
  114. interval = rng.randint(30, 60) # 抢完票休息 30-60 秒
  115. self._log(f"Booking attempted, entering cooldown for {interval} sec.")
  116. else:
  117. interval = 30
  118. mode = task.qw_cfg.mode
  119. if mode == QueryWaitMode.Loop:
  120. interval = 0
  121. elif mode == QueryWaitMode.Fixed:
  122. interval = task.qw_cfg.fixed_wait
  123. elif mode == QueryWaitMode.Random:
  124. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  125. task.next_run = time.time() + interval
  126. # 清理不健康实例
  127. with self.m_lock:
  128. initial_size = len(self.m_tasks)
  129. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  130. if len(self.m_tasks) < initial_size:
  131. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  132. self._log("[STOP] monitor loop exiting...")
  133. def _creator_loop(self):
  134. """
  135. @brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。
  136. """
  137. self._log("[START] creator loop starting...")
  138. while not self.m_stop_event.is_set():
  139. time.sleep(0.1) # 避免空转太快
  140. diff = 0
  141. with self.m_lock:
  142. current_instances_count = len(self.m_tasks)
  143. diff = self.m_cfg.target_instances - current_instances_count
  144. if diff > 0:
  145. self._log(f"Need to create {diff} new instance(s). Current: {current_instances_count}")
  146. # 准备配置
  147. plg_cfg = self._make_plg_config()
  148. if not plg_cfg:
  149. self._log("Failed to prepare plugin configuration, sleeping 30s.")
  150. time.sleep(30) # 等待资源 (账户/代理) 恢复
  151. continue
  152. # 在线程池中创建实例,模拟C++的异步创建
  153. future = ThreadPool.getInstance().enqueue(self._create_instance, plg_cfg)
  154. inst = future.result() # 等待创建完成
  155. if inst:
  156. with self.m_lock:
  157. # 确保在添加到任务列表之前,实例数量仍然低于目标值
  158. if len(self.m_tasks) < self.m_cfg.target_instances:
  159. new_task = Task(
  160. instance=inst,
  161. qw_cfg=self.m_cfg.query_wait,
  162. next_run=time.time() # 立即执行第一次查询
  163. )
  164. self.m_tasks.append(new_task)
  165. self._log(f"New instance added. Total instances: {len(self.m_tasks)}")
  166. else:
  167. self._log("Target instances already met, discarding newly created instance.")
  168. else:
  169. self._log("Failed to create plugin instance.")
  170. # 可以在这里添加重试逻辑或错误处理
  171. # 模拟创建间隔,避免瞬间创建过多实例
  172. time.sleep(random.uniform(1.0, 5.0))
  173. self._log("[STOP] creator loop exiting...")
  174. def _make_plg_config(self) -> Optional[VSPlgConfig]:
  175. """
  176. @brief 准备插件配置 (账号、代理等)。
  177. """
  178. self._log("Preparing plugin configuration...")
  179. plg_cfg = VSPlgConfig()
  180. plg_cfg.debug = self.m_cfg.debug
  181. # 账号配置
  182. if self.m_cfg.need_account:
  183. account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool)
  184. if not account:
  185. self._log(f"No available accounts for pool {self.m_cfg.account_pool}")
  186. return None
  187. plg_cfg.account.id = account["id"]
  188. plg_cfg.account.username = account["username"]
  189. plg_cfg.account.password = account["password"]
  190. plg_cfg.account.lock_until = account.get("lock_until", "")
  191. self._log(f"Using account ID {plg_cfg.account.id}, username {plg_cfg.account.username}")
  192. # 代理配置
  193. if self.m_cfg.need_proxy:
  194. proxy = None
  195. if self.m_cfg.need_ip_bind:
  196. proxy_id = BindingManager.Instance().get_bounded_proxy_id(self.m_cfg.account_pool, plg_cfg.account.id)
  197. if proxy_id is None: # 没有绑定代理,需要获取一个新的并绑定
  198. bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool)
  199. proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids)
  200. if not proxy:
  201. self._log(f"No available unbind proxy in pool {self.m_cfg.proxy_pool}")
  202. return None
  203. BindingManager.Instance().create_binding(
  204. self.m_cfg.account_pool, plg_cfg.account.id,
  205. self.m_cfg.proxy_pool, proxy["id"], "dynamic")
  206. self._log(f"Created dynamic binding: account {plg_cfg.account.id} -> proxy {proxy['id']}")
  207. else:
  208. all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, [])
  209. proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None)
  210. if not proxy:
  211. self._log(f"Bounded proxy ID {proxy_id} not found in pool {self.m_cfg.proxy_pool}")
  212. return None
  213. else:
  214. proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool)
  215. if not proxy:
  216. self._log(f"No available proxy in pool {self.m_cfg.proxy_pool}")
  217. return None
  218. plg_cfg.proxy.id = proxy["id"]
  219. plg_cfg.proxy.ip = proxy["ip"]
  220. plg_cfg.proxy.port = proxy["port"]
  221. plg_cfg.proxy.scheme = proxy["scheme"]
  222. plg_cfg.proxy.username = proxy.get("username", "")
  223. plg_cfg.proxy.password = proxy.get("password", "")
  224. plg_cfg.proxy.lock_until = proxy.get("lock_until", "")
  225. self._log(f"Using proxy ID {plg_cfg.proxy.id}, IP {plg_cfg.proxy.ip}:{plg_cfg.proxy.port}")
  226. plg_cfg.free_config = self.m_cfg.free_config
  227. self._log("Plugin configuration prepared.")
  228. return plg_cfg
  229. def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]:
  230. # """
  231. # @brief 创建并初始化单个插件实例。
  232. # 这个方法在 creator_loop 的线程池中执行。
  233. # """
  234. self._log(f"Creating plugin instance (plugin={self.m_cfg.plugin_config.plugin_name})...")
  235. try:
  236. inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  237. inst.set_log(self.m_logger)
  238. inst.set_config(plg_cfg)
  239. inst.create_session()
  240. if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0:
  241. AccountManager.Instance().lock_account(
  242. self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60)
  243. self._log("Plugin instance created and session established.")
  244. return inst
  245. except Exception as e:
  246. self._log(f"Error creating plugin instance: {e}")
  247. return None
  248. def _on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult):
  249. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  250. # 定义内部预订任务
  251. def book_task(inst: IVSPlg, result: VSQueryResult):
  252. task_id = None
  253. try:
  254. # 1. 获取对应的用户任务 (Pop Task)
  255. booking_routing_key = f'auto.{result.routing_key}' if result.routing_key else "default"
  256. # 尝试获取任务
  257. task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  258. if not task:
  259. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  260. return
  261. task_id = task['id']
  262. order_id = task['order_id']
  263. user_input = task.get('user_inputs', {})
  264. self._log(f"Picked up Task ID {task_id} for booking...")
  265. # 2. 执行预订
  266. # 注意:插件的 book 方法需要接收 user_input
  267. book_res = inst.book(result, user_input)
  268. # 3. 处理结果
  269. if book_res.success:
  270. self._log(f" Booking SUCCESS! Order: {order_id}")
  271. # 4. 成功逻辑:更新任务状态为 grabbed
  272. # 包含后端需要的关键信息
  273. current_grab_info = {
  274. "account": book_res.account,
  275. "session_id": book_res.session_id,
  276. "slot_date": book_res.book_date,
  277. "slot_time": book_res.book_time,
  278. "timestamp": int(time.time()),
  279. "payment_link": book_res.payment_link,
  280. }
  281. update_data = {
  282. "status": "grabbed",
  283. # === 修改点:直接覆盖 ===
  284. # 直接发送字典对象,requests 会自动序列化为 JSON Object
  285. # 满足后端 "type":"dict_type" 的校验
  286. "grabbed_history": current_grab_info
  287. }
  288. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  289. self._log(f"Task {task_id} marked as GRABBED.")
  290. # 成功后 task_id 置空,防止 finally 块再次将其重置为 pending
  291. task_id = None
  292. except Exception as e:
  293. self._log(f"Exception during booking: {e}")
  294. finally:
  295. # 5. Return to Queue (回滚机制)
  296. if task_id is not None:
  297. self._log(f"Returning Task {task_id} to queue (status=pending).")
  298. try:
  299. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  300. except Exception as ex:
  301. self.log(f"Failed to return task to queue: {ex}")
  302. futures = []
  303. f = ThreadPool.getInstance().enqueue(book_task, sptr, query_result)
  304. futures.append(f)
  305. wait(futures)