gco.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from typing import List, Dict, Tuple, Any, Optional, Callable
  8. from concurrent.futures import wait
  9. # 导入所有依赖
  10. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  11. from vs_plg import IVSPlg
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.account_manager import AccountManager
  14. from toolkit.proxy_manager import ProxyManager
  15. from toolkit.thread_pool import ThreadPool
  16. from toolkit.vs_cloud_api import VSCloudApi
  17. class GCO:
  18. """
  19. @brief GCO 类
  20. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  21. 任务调度、查询和预订流程。
  22. """
  23. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  24. self.m_cfg = cfg
  25. self.m_factory = VSPlgFactory() # 插件工厂实例
  26. self.m_logger = logger
  27. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  28. self.m_stop_event = threading.Event() # 用于停止线程
  29. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  30. self.m_monitor_thread: Optional[threading.Thread] = None
  31. self.m_creator_thread: Optional[threading.Thread] = None
  32. self.m_pending_builtin = 0 # 正在创建中的内置账号数量
  33. self.m_pending_orders = 0 # 正在创建中的订单账号数量
  34. def start(self):
  35. """
  36. @brief 启动协调器,包括插件注册和线程启动。
  37. """
  38. if not self.m_cfg.enable:
  39. self._log("Group is disabled, not starting.")
  40. return
  41. self._log("Starting coordinator...")
  42. self.m_stop_event.clear()
  43. # 注册插件
  44. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  45. # === 修复点:更智能的类名推导逻辑 ===
  46. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  47. plugin_name = self.m_cfg.plugin_config.plugin_name
  48. class_name = "".join(part.title() for part in plugin_name.split('_'))
  49. # 调试日志:确认推导出的类名
  50. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  51. self.m_factory.register_plugin(plugin_name,
  52. plugin_module_path,
  53. class_name)
  54. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  55. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  56. self.m_monitor_thread.start()
  57. self.m_creator_thread.start()
  58. self._log("Coordinator threads started.")
  59. def stop(self):
  60. """
  61. @brief 停止协调器,等待所有线程结束。
  62. """
  63. self._log("Stopping coordinator...")
  64. self.m_stop_event.set() # 发送停止信号
  65. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  66. self.m_monitor_thread.join()
  67. if self.m_creator_thread and self.m_creator_thread.is_alive():
  68. self.m_creator_thread.join()
  69. self._log("Coordinator stopped.")
  70. def group_id(self) -> str:
  71. """
  72. @brief 获取分组ID。
  73. """
  74. return self.m_cfg.identifier
  75. def _log(self, message):
  76. if self.m_logger:
  77. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  78. def _monitor_loop(self):
  79. """
  80. @brief 监控循环:定期检查实例健康状况,执行查询任务。
  81. 一旦发现号源,立即触发所有实例进行批量预订。
  82. """
  83. self._log("[START] monitor loop starting...")
  84. rng = random.Random()
  85. # 创建一个临时线程池用于并发抢票,避免阻塞监控循环太久
  86. # max_workers 根据你的最大并发账号数调整,或者设为 None (默认 CPU核心数 * 5)
  87. while not self.m_stop_event.is_set():
  88. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  89. time.sleep(sleep_ms)
  90. now = time.time()
  91. # 1. 拷贝任务列表 (Snapshot)
  92. tasks_to_process = []
  93. with self.m_lock:
  94. tasks_to_process = list(self.m_tasks)
  95. # 标记本轮循环是否触发了批量抢票
  96. batch_booking_triggered = False
  97. for task in tasks_to_process:
  98. # 二次检查任务是否还在列表中(防止在遍历过程中被移除)
  99. with self.m_lock:
  100. if task not in self.m_tasks:
  101. continue
  102. # 健康检查
  103. if not task.instance.health_check():
  104. continue
  105. # 检查时间窗口
  106. if now < task.next_run:
  107. continue
  108. # === 执行查询 ===
  109. try:
  110. # 这里的 task 充当了“哨兵”的角色
  111. result = task.instance.query()
  112. if result.success:
  113. self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING for {len(tasks_to_process)} workers.")
  114. # === [核心修改]:一人发现,全员出击 ===
  115. # 1. 准备并发任务
  116. # 我们使用刚刚快照的 tasks_to_process,或者重新获取一次全量列表
  117. # 重点:所有实例 (w.instance) 都使用同一份查询结果 (result) 去抢
  118. futures = []
  119. for worker in tasks_to_process:
  120. # 提交到线程池并发执行,极大减少时间差
  121. f = ThreadPool.getInstance().enqueue(
  122. self._on_query_result,
  123. worker,
  124. result
  125. )
  126. futures.append(f)
  127. # 2. 等待所有抢票任务结束 (可选,根据业务需求决定是否阻塞监控)
  128. # 如果不wait,监控线程会立刻继续,可能导致重复触发
  129. # 建议 wait,确保这一波抢票彻底结束
  130. for f in futures:
  131. try:
  132. f.result() # 获取结果,捕获异常
  133. except Exception as e:
  134. self._log(f"Batch booking exception: {e}")
  135. # 3. 标记触发状态
  136. batch_booking_triggered = True
  137. break
  138. else:
  139. # 没查到,仅当前 task 记录日志
  140. self._log(f"Query done by {task.instance.get_group_id()}, No availability")
  141. except Exception as e:
  142. self._log(f"Exception during query: {e}")
  143. # === 计算下次运行时间 (仅针对当前 query 的 task,除非触发了 batch) ===
  144. if not batch_booking_triggered:
  145. interval = 30
  146. mode = task.qw_cfg.mode
  147. if mode == QueryWaitMode.Loop:
  148. interval = 0
  149. elif mode == QueryWaitMode.Fixed:
  150. interval = task.qw_cfg.fixed_wait
  151. elif mode == QueryWaitMode.Random:
  152. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  153. task.next_run = time.time() + interval
  154. # === [修正后的批量冷却逻辑] ===
  155. # 抢票结束后,必须严格按照每个账号配置的频率进入冷却,防止集体429
  156. if batch_booking_triggered:
  157. self._log(f"Batch booking finished. Resetting wait times based on configurations.")
  158. # 重新获取当前时间(因为抢票过程消耗了时间)
  159. now_ts = time.time()
  160. with self.m_lock:
  161. for t in self.m_tasks:
  162. # 重新读取该任务的配置
  163. interval = 30 # 默认兜底
  164. mode = t.qw_cfg.mode
  165. if mode == QueryWaitMode.Loop:
  166. # 即使是 Loop 模式,在大规模抢票后建议给一个微小的缓冲(如1秒),避免死循环导致 CPU 飙升
  167. # 如果你的逻辑允许立刻重试,这里可以是 0
  168. interval = 1
  169. elif mode == QueryWaitMode.Fixed:
  170. interval = t.qw_cfg.fixed_wait
  171. elif mode == QueryWaitMode.Random:
  172. interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
  173. # 设置下次运行时间
  174. t.next_run = now_ts + interval
  175. self._log(f"All workers cooldown reset. Resuming monitor loop.")
  176. # 清理不健康实例
  177. with self.m_lock:
  178. initial_size = len(self.m_tasks)
  179. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  180. if len(self.m_tasks) < initial_size:
  181. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  182. self._log("[STOP] monitor loop exiting...")
  183. def _creator_loop(self):
  184. """
  185. @brief 创建者循环:双轨制并发控制 + Pending 计数防超发
  186. """
  187. self._log("[START] creator loop starting...")
  188. while not self.m_stop_event.is_set():
  189. time.sleep(1.0) # 保持 1秒间隔,给资源管理器缓冲时间
  190. # -------------------------------------------------------------
  191. # 1. 统计当前状态 (Active + Pending)
  192. # -------------------------------------------------------------
  193. current_builtin = 0
  194. current_orders = 0
  195. with self.m_lock:
  196. for t in self.m_tasks:
  197. if t.task_ref:
  198. current_orders += 1
  199. else:
  200. current_builtin += 1
  201. # 读取正在创建中的计数 (快照)
  202. pending_builtin = self.m_pending_builtin
  203. pending_orders = self.m_pending_orders
  204. # -------------------------------------------------------------
  205. # 2. 计算缺口
  206. # -------------------------------------------------------------
  207. # === [内置账号缺口] ===
  208. needed_builtin = 0
  209. # 逻辑:只要目标数量 > 0,就尝试补充 (兼容需要账号和不需要账号的模式)
  210. if self.m_cfg.target_instances > 0:
  211. total_builtin_proj = current_builtin + pending_builtin
  212. needed_builtin = self.m_cfg.target_instances - total_builtin_proj
  213. # === [订单账号缺口] ===
  214. needed_order = 0
  215. # 逻辑:必须需要账号(need_account=True) 且 限制数量大于 0
  216. # (order_account_enable 已移除,直接看 limit)
  217. if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
  218. total_order_proj = current_orders + pending_orders
  219. needed_order = self.m_cfg.order_account_online_limit - total_order_proj
  220. # 如果两边都满了,跳过本轮
  221. if needed_builtin <= 0 and needed_order <= 0:
  222. continue
  223. # -------------------------------------------------------------
  224. # 3. 准备配置
  225. # -------------------------------------------------------------
  226. config_data = self._prepare_next_config(
  227. need_builtin=(needed_builtin > 0),
  228. need_order=(needed_order > 0),
  229. )
  230. if not config_data:
  231. time.sleep(10.0)
  232. continue
  233. plg_cfg, task_ref = config_data
  234. # -------------------------------------------------------------
  235. # 4. [绝对核心] 提交前立即增加 Pending 计数
  236. # -------------------------------------------------------------
  237. # 必须在这里加!防止下一秒循环重复创建!
  238. with self.m_lock:
  239. if task_ref:
  240. self.m_pending_orders += 1
  241. else:
  242. self.m_pending_builtin += 1
  243. p_type = "Order" if task_ref else "Built-in"
  244. self._log(f"+++ Spawning {p_type} (Pending: {self.m_pending_builtin}/{self.m_pending_orders})...")
  245. # 5. 异步提交
  246. try:
  247. ThreadPool.getInstance().enqueue(
  248. self._create_and_register_plg_worker,
  249. plg_cfg,
  250. task_ref
  251. )
  252. except Exception as e:
  253. # 提交失败回滚计数
  254. self._log(f"Failed to enqueue task: {e}")
  255. with self.m_lock:
  256. if task_ref: self.m_pending_orders -= 1
  257. else: self.m_pending_builtin -= 1
  258. # 错开并发
  259. time.sleep(random.uniform(0.5, 1.0))
  260. self._log("[STOP] creator loop exiting...")
  261. def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]:
  262. """
  263. @brief 准备下一个插件实例的配置
  264. """
  265. plg_cfg = VSPlgConfig()
  266. plg_cfg.debug = self.m_cfg.debug
  267. plg_cfg.free_config = self.m_cfg.free_config
  268. plg_cfg.session_max_life = self.m_cfg.session_max_life
  269. task_ref = None
  270. config_ready = False
  271. pool_name = self.m_cfg.local_account_pool
  272. # =================================================================
  273. # 1. 账号获取
  274. # =================================================================
  275. if not self.m_cfg.need_account:
  276. # === 游客模式 (无需账号) ===
  277. if need_builtin:
  278. plg_cfg.account.id = 0
  279. plg_cfg.account.username = "Guest"
  280. config_ready = True
  281. task_ref = None
  282. else:
  283. # === 标准模式 (需要账号) ===
  284. # A. 优先补充内置账号 (只要 target_instances 还有缺口)
  285. if need_builtin:
  286. # 获取并锁定账号
  287. account = AccountManager.Instance().next(
  288. pool_name,
  289. lock_duration=self.m_cfg.account_login_interval * 60
  290. )
  291. if account:
  292. plg_cfg.account.id = account["id"]
  293. plg_cfg.account.username = account["username"]
  294. plg_cfg.account.password = account["password"]
  295. plg_cfg.account.lock_until = account.get("lock_until", 0)
  296. config_ready = True
  297. task_ref = None
  298. self._log(f"Selected Built-in: {plg_cfg.account.username}")
  299. # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
  300. # 只有当 limit > 0 时才尝试
  301. if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0:
  302. try:
  303. routing_key = self.m_cfg.order_account_routing
  304. task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key)
  305. if task_ref:
  306. user_inputs = task_ref.get('user_inputs', {})
  307. plg_cfg.account.id = 0 # 临时账号
  308. plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "")
  309. plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "")
  310. if plg_cfg.account.username:
  311. config_ready = True
  312. self._log(f"Selected Order Acc: {plg_cfg.account.username}")
  313. else:
  314. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  315. return None
  316. except Exception as e:
  317. pass
  318. #self._log(f"Get Order task exception, e={e}")
  319. if not config_ready:
  320. return None
  321. # =================================================================
  322. # 2. 代理配置
  323. # =================================================================
  324. if self.m_cfg.need_proxy:
  325. # 轮询代理
  326. proxy_lock_time = self.m_cfg.proxy_lock_interval
  327. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time)
  328. if not proxy:
  329. try:
  330. if task_ref:
  331. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  332. return None
  333. except Exception as e:
  334. self._log(f"Return Order task to queue exception, e={e}")
  335. plg_cfg.proxy.id = proxy["id"]
  336. plg_cfg.proxy.ip = proxy["ip"]
  337. plg_cfg.proxy.port = proxy["port"]
  338. plg_cfg.proxy.scheme = proxy["scheme"]
  339. plg_cfg.proxy.username = proxy.get("username", "")
  340. plg_cfg.proxy.password = proxy.get("password", "")
  341. plg_cfg.proxy.lock_until = proxy.get("lock_until", 0)
  342. return plg_cfg, task_ref
  343. def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None):
  344. """
  345. @brief 异步创建工作线程
  346. """
  347. instance = None
  348. creation_success = False
  349. try:
  350. # 1. 耗时操作:实例化 & 登录
  351. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  352. instance.set_log(self.m_logger)
  353. instance.set_config(plg_cfg)
  354. instance.create_session() # 可能耗时很久
  355. # 2. 注册到任务列表
  356. with self.m_lock:
  357. # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑
  358. # 计算预定权限
  359. book_allowed = False
  360. if not self.m_cfg.account_bind_applicant:
  361. book_allowed = True
  362. elif self.m_cfg.account_bind_applicant and task_ref:
  363. book_allowed = True
  364. new_task = Task(
  365. instance=instance,
  366. qw_cfg=self.m_cfg.query_wait,
  367. next_run=time.time(),
  368. task_ref=task_ref,
  369. book_allowed=book_allowed
  370. )
  371. self.m_tasks.append(new_task)
  372. creation_success = True
  373. p_type = "Order" if task_ref else "Built-in"
  374. self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===")
  375. except Exception as e:
  376. self._log(f"Creation failed for {plg_cfg.account.username}: {e}")
  377. finally:
  378. # -------------------------------------------------------------
  379. # 3. [绝对核心] 必须扣减 Pending
  380. # -------------------------------------------------------------
  381. # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放
  382. with self.m_lock:
  383. if task_ref:
  384. self.m_pending_orders -= 1
  385. if self.m_pending_orders < 0: self.m_pending_orders = 0
  386. else:
  387. self.m_pending_builtin -= 1
  388. if self.m_pending_builtin < 0: self.m_pending_builtin = 0
  389. # 4. 失败回滚
  390. if not creation_success:
  391. # 如果是云端订单,必须归还,否则丢单
  392. if task_ref:
  393. self._log(f"Rolling back task {task_ref['id']}")
  394. try:
  395. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  396. except:
  397. pass
  398. def _on_query_result(self, t: Task, query_result: VSQueryResult):
  399. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  400. if not t.book_allowed:
  401. return
  402. # -------------------------------------------------------
  403. # 1. 准备任务数据 (Data Preparation)
  404. # -------------------------------------------------------
  405. task_data = t.task_ref
  406. is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚)
  407. # 如果没有绑定本地任务,尝试从云端 Pop
  408. if not task_data:
  409. booking_routing_key = f'auto.{query_result.routing_key}' if query_result.routing_key else "default"
  410. task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  411. if not task_data:
  412. # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃
  413. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  414. return
  415. is_cloud_task = True
  416. self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...")
  417. # 统一提取核心参数
  418. task_id = task_data['id']
  419. order_id = task_data.get('order_id')
  420. user_input = task_data.get('user_inputs', {})
  421. # -------------------------------------------------------
  422. # 2. 执行预订 (Execution)
  423. # -------------------------------------------------------
  424. booking_success = False
  425. try:
  426. # 统一调用,无需区分来源
  427. book_res = t.instance.book(query_result, user_input)
  428. # -------------------------------------------------------
  429. # 3. 成功处理 (Success Handling)
  430. # -------------------------------------------------------
  431. if book_res.success:
  432. booking_success = True
  433. self._log(f"✅ Booking SUCCESS! Order: {order_id}")
  434. current_grab_info = {
  435. "account": book_res.account,
  436. "session_id": book_res.session_id,
  437. "urn": book_res.urn,
  438. "slot_date": book_res.book_date,
  439. "slot_time": book_res.book_time,
  440. "timestamp": int(time.time()),
  441. "payment_link": book_res.payment_link,
  442. }
  443. update_data = {
  444. "status": "grabbed",
  445. "grabbed_history": current_grab_info
  446. }
  447. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  448. self._log(f"Task {task_id} marked as GRABBED.")
  449. else:
  450. self._log(f"❌ Booking Failed for Order {order_id}: {book_res.message}")
  451. except Exception as e:
  452. self._log(f"Exception during booking for Order {order_id}: {e}")
  453. finally:
  454. # -------------------------------------------------------
  455. # 4. 回滚机制 (Rollback / Return to Queue)
  456. # -------------------------------------------------------
  457. # 只有两个条件同时满足才回滚:
  458. # 1. 任务来自云端队列 (is_cloud_task is True)
  459. # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常
  460. if is_cloud_task and not booking_success:
  461. self._log(f"Returning Task {task_id} to queue (status=pending).")
  462. try:
  463. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  464. except Exception as ex:
  465. self._log(f"Failed to return task to queue: {ex}")