gco.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from datetime import datetime, timezone
  8. from typing import List, Dict, Tuple, Any, Optional, Callable
  9. from concurrent.futures import wait
  10. # 导入所有依赖
  11. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.proxy_manager import ProxyManager
  14. from toolkit.thread_pool import ThreadPool
  15. from toolkit.vs_cloud_api import VSCloudApi
  16. import traceback
  17. class GCO:
  18. """
  19. @brief GCO 类
  20. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  21. 任务调度、查询和预订流程。
  22. """
  23. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  24. self.m_cfg = cfg
  25. self.m_factory = VSPlgFactory() # 插件工厂实例
  26. self.m_logger = logger
  27. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  28. self.m_stop_event = threading.Event() # 用于停止线程
  29. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  30. self.m_monitor_thread: Optional[threading.Thread] = None
  31. self.m_creator_thread: Optional[threading.Thread] = None
  32. self.m_pending_builtin = 0 # 正在创建中的内置账号数量
  33. self.m_pending_orders = 0 # 正在创建中的订单账号数量
  34. def start(self):
  35. """
  36. @brief 启动协调器,包括插件注册和线程启动。
  37. """
  38. if not self.m_cfg.enable:
  39. self._log("Group is disabled, not starting.")
  40. return
  41. self._log("Starting coordinator...")
  42. self.m_stop_event.clear()
  43. # 注册插件
  44. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  45. # === 修复点:更智能的类名推导逻辑 ===
  46. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  47. plugin_name = self.m_cfg.plugin_config.plugin_name
  48. class_name = "".join(part.title() for part in plugin_name.split('_'))
  49. # 调试日志:确认推导出的类名
  50. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  51. self.m_factory.register_plugin(plugin_name,
  52. plugin_module_path,
  53. class_name)
  54. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  55. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  56. self.m_monitor_thread.start()
  57. self.m_creator_thread.start()
  58. self._log("Coordinator threads started.")
  59. def stop(self):
  60. """
  61. @brief 停止协调器,等待所有线程结束。
  62. """
  63. self._log("Stopping coordinator...")
  64. self.m_stop_event.set() # 发送停止信号
  65. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  66. self.m_monitor_thread.join()
  67. if self.m_creator_thread and self.m_creator_thread.is_alive():
  68. self.m_creator_thread.join()
  69. self._log("Coordinator stopped.")
  70. def group_id(self) -> str:
  71. """
  72. @brief 获取分组ID。
  73. """
  74. return self.m_cfg.identifier
  75. def _log(self, message):
  76. if self.m_logger:
  77. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  78. def _monitor_loop(self):
  79. """
  80. @brief 监控循环:定期检查实例健康状况,执行查询任务。
  81. 一旦发现号源,立即触发所有实例进行批量预订。
  82. """
  83. self._log("[START] monitor loop starting...")
  84. rng = random.Random()
  85. # 创建一个临时线程池用于并发抢票,避免阻塞监控循环太久
  86. # max_workers 根据你的最大并发账号数调整,或者设为 None (默认 CPU核心数 * 5)
  87. while not self.m_stop_event.is_set():
  88. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  89. time.sleep(sleep_ms)
  90. now = time.time()
  91. # 1. 拷贝任务列表 (Snapshot)
  92. tasks_to_process = []
  93. with self.m_lock:
  94. tasks_to_process = list(self.m_tasks)
  95. # 标记本轮循环是否触发了批量抢票
  96. batch_booking_triggered = False
  97. for task in tasks_to_process:
  98. # 二次检查任务是否还在列表中(防止在遍历过程中被移除)
  99. with self.m_lock:
  100. if task not in self.m_tasks:
  101. continue
  102. # 健康检查
  103. if not task.instance.health_check():
  104. continue
  105. # 检查时间窗口
  106. if now < task.next_run:
  107. continue
  108. # === 执行查询 ===
  109. apt_type = None
  110. try:
  111. apt_types = self.m_cfg.appointment_types
  112. if not apt_types:
  113. self._log(f"No matching appointment configuration found.")
  114. continue
  115. weights = [float(item.weight) for item in apt_types]
  116. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  117. VSCloudApi.Instance().slot_refresh_start(
  118. apt_type.routing_key,
  119. country=apt_type.country,
  120. city=apt_type.city,
  121. visa_type=apt_type.visa_type
  122. )
  123. # 这里的 task 充当了“哨兵”的角色
  124. result = task.instance.query(apt_type)
  125. result.apt_type = apt_type
  126. VSCloudApi.Instance().slot_refresh_success(
  127. apt_type.routing_key
  128. )
  129. if result.success:
  130. self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING for {len(tasks_to_process)} workers.")
  131. # 上报Slot Snapshot
  132. query_payload = result.to_snapshot_payload()
  133. query_payload["website"] = self.m_cfg.website
  134. query_payload["snapshot_source"] = 'worker'
  135. query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
  136. VSCloudApi.Instance().slot_snapshot_report(query_payload)
  137. # === [核心修改]:一人发现,全员出击 ===
  138. # 1. 准备并发任务
  139. # 我们使用刚刚快照的 tasks_to_process,或者重新获取一次全量列表
  140. # 重点:所有实例 (w.instance) 都使用同一份查询结果 (result) 去抢
  141. futures = []
  142. for worker in tasks_to_process:
  143. # 提交到线程池并发执行,极大减少时间差
  144. f = ThreadPool.getInstance().enqueue(
  145. self._on_query_result,
  146. worker,
  147. result
  148. )
  149. futures.append(f)
  150. # 2. 等待所有抢票任务结束 (可选,根据业务需求决定是否阻塞监控)
  151. # 如果不wait,监控线程会立刻继续,可能导致重复触发
  152. # 建议 wait,确保这一波抢票彻底结束
  153. for f in futures:
  154. try:
  155. f.result() # 获取结果,捕获异常
  156. except Exception as e:
  157. self._log(f"Batch booking exception: {e}")
  158. # 3. 标记触发状态
  159. batch_booking_triggered = True
  160. break
  161. else:
  162. # 没查到,仅当前 task 记录日志
  163. self._log(f"Query done by {task.instance.get_group_id()}, No availability")
  164. except Exception as e:
  165. traceback.print_exc()
  166. self._log(f"Exception during query: {e}")
  167. if apt_type:
  168. VSCloudApi.Instance().slot_refresh_fail(
  169. apt_type.routing_key,
  170. error=str(e)
  171. )
  172. # === 计算下次运行时间 (仅针对当前 query 的 task,除非触发了 batch) ===
  173. if not batch_booking_triggered:
  174. interval = 30
  175. mode = task.qw_cfg.mode
  176. if mode == QueryWaitMode.Loop:
  177. interval = 0
  178. elif mode == QueryWaitMode.Fixed:
  179. interval = task.qw_cfg.fixed_wait
  180. elif mode == QueryWaitMode.Random:
  181. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  182. task.next_run = time.time() + interval
  183. # === [修正后的批量冷却逻辑] ===
  184. # 抢票结束后,必须严格按照每个账号配置的频率进入冷却,防止集体429
  185. if batch_booking_triggered:
  186. self._log(f"Batch booking finished. Resetting wait times based on configurations.")
  187. # 重新获取当前时间(因为抢票过程消耗了时间)
  188. now_ts = time.time()
  189. with self.m_lock:
  190. for t in self.m_tasks:
  191. # 重新读取该任务的配置
  192. interval = 30 # 默认兜底
  193. mode = t.qw_cfg.mode
  194. if mode == QueryWaitMode.Loop:
  195. # 即使是 Loop 模式,在大规模抢票后建议给一个微小的缓冲(如1秒),避免死循环导致 CPU 飙升
  196. # 如果你的逻辑允许立刻重试,这里可以是 0
  197. interval = 1
  198. elif mode == QueryWaitMode.Fixed:
  199. interval = t.qw_cfg.fixed_wait
  200. elif mode == QueryWaitMode.Random:
  201. interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
  202. # 设置下次运行时间
  203. t.next_run = now_ts + interval
  204. self._log(f"All workers cooldown reset. Resuming monitor loop.")
  205. # 清理不健康实例
  206. with self.m_lock:
  207. initial_size = len(self.m_tasks)
  208. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  209. if len(self.m_tasks) < initial_size:
  210. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  211. self._log("[STOP] monitor loop exiting...")
  212. def _creator_loop(self):
  213. """
  214. @brief 创建者循环:双轨制并发控制 + Pending 计数防超发
  215. """
  216. self._log("[START] creator loop starting...")
  217. while not self.m_stop_event.is_set():
  218. time.sleep(1.0) # 保持 1秒间隔,给资源管理器缓冲时间
  219. # -------------------------------------------------------------
  220. # 1. 统计当前状态 (Active + Pending)
  221. # -------------------------------------------------------------
  222. current_builtin = 0
  223. current_orders = 0
  224. with self.m_lock:
  225. for t in self.m_tasks:
  226. if t.task_ref:
  227. current_orders += 1
  228. else:
  229. current_builtin += 1
  230. # 读取正在创建中的计数 (快照)
  231. pending_builtin = self.m_pending_builtin
  232. pending_orders = self.m_pending_orders
  233. # -------------------------------------------------------------
  234. # 2. 计算缺口
  235. # -------------------------------------------------------------
  236. # === [内置账号缺口] ===
  237. needed_builtin = 0
  238. # 逻辑:只要目标数量 > 0,就尝试补充 (兼容需要账号和不需要账号的模式)
  239. if self.m_cfg.target_instances > 0:
  240. total_builtin_proj = current_builtin + pending_builtin
  241. needed_builtin = self.m_cfg.target_instances - total_builtin_proj
  242. # === [订单账号缺口] ===
  243. needed_order = 0
  244. # 逻辑:必须需要账号(need_account=True) 且 限制数量大于 0
  245. # (order_account_enable 已移除,直接看 limit)
  246. if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
  247. total_order_proj = current_orders + pending_orders
  248. needed_order = self.m_cfg.order_account_online_limit - total_order_proj
  249. # 如果两边都满了,跳过本轮
  250. if needed_builtin <= 0 and needed_order <= 0:
  251. continue
  252. # -------------------------------------------------------------
  253. # 3. 准备配置
  254. # -------------------------------------------------------------
  255. config_data = self._prepare_next_config(
  256. need_builtin=(needed_builtin > 0),
  257. need_order=(needed_order > 0),
  258. )
  259. if not config_data:
  260. time.sleep(10.0)
  261. continue
  262. plg_cfg, task_ref = config_data
  263. # -------------------------------------------------------------
  264. # 4. [绝对核心] 提交前立即增加 Pending 计数
  265. # -------------------------------------------------------------
  266. # 必须在这里加!防止下一秒循环重复创建!
  267. with self.m_lock:
  268. if task_ref:
  269. self.m_pending_orders += 1
  270. else:
  271. self.m_pending_builtin += 1
  272. p_type = "Order" if task_ref else "Built-in"
  273. self._log(f"+++ Spawning {p_type} (Pending: {self.m_pending_builtin}/{self.m_pending_orders})...")
  274. # 5. 异步提交
  275. try:
  276. ThreadPool.getInstance().enqueue(
  277. self._create_and_register_plg_worker,
  278. plg_cfg,
  279. task_ref
  280. )
  281. except Exception as e:
  282. # 提交失败回滚计数
  283. self._log(f"Failed to enqueue task: {e}")
  284. with self.m_lock:
  285. if task_ref: self.m_pending_orders -= 1
  286. else: self.m_pending_builtin -= 1
  287. # 错开并发
  288. time.sleep(random.uniform(0.5, 1.0))
  289. self._log("[STOP] creator loop exiting...")
  290. def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]:
  291. """
  292. @brief 准备下一个插件实例的配置
  293. """
  294. plg_cfg = VSPlgConfig()
  295. plg_cfg.debug = self.m_cfg.debug
  296. plg_cfg.free_config = self.m_cfg.free_config
  297. plg_cfg.session_max_life = self.m_cfg.session_max_life
  298. task_ref = None
  299. config_ready = False
  300. pool_name = self.m_cfg.local_account_pool
  301. # =================================================================
  302. # 1. 账号获取
  303. # =================================================================
  304. if not self.m_cfg.need_account:
  305. # === 游客模式 (无需账号) ===
  306. if need_builtin:
  307. plg_cfg.account.id = 0
  308. plg_cfg.account.username = "Guest"
  309. config_ready = True
  310. task_ref = None
  311. else:
  312. # === 标准模式 (需要账号) ===
  313. # A. 优先补充内置账号 (只要 target_instances 还有缺口)
  314. if need_builtin:
  315. # 获取并锁定账号
  316. account = VSCloudApi.Instance().get_next_account(
  317. pool_name,
  318. lock_duration=self.m_cfg.account_login_interval * 60
  319. )
  320. if account:
  321. plg_cfg.account.id = account["id"]
  322. plg_cfg.account.username = account["username"]
  323. plg_cfg.account.password = account["password"]
  324. plg_cfg.account.lock_until = account.get("lock_until", 0)
  325. config_ready = True
  326. task_ref = None
  327. self._log(f"Selected Built-in: {plg_cfg.account.username}")
  328. # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
  329. # 只有当 limit > 0 时才尝试
  330. if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0:
  331. try:
  332. routing_key = self.m_cfg.order_account_routing
  333. task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key)
  334. if task_ref:
  335. user_inputs = task_ref.get('user_inputs', {})
  336. plg_cfg.account.id = 0 # 临时账号
  337. plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "")
  338. plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "")
  339. if plg_cfg.account.username:
  340. config_ready = True
  341. self._log(f"Selected Order Acc: {plg_cfg.account.username}")
  342. else:
  343. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  344. return None
  345. except Exception as e:
  346. pass
  347. #self._log(f"Get Order task exception, e={e}")
  348. if not config_ready:
  349. return None
  350. # =================================================================
  351. # 2. 代理配置
  352. # =================================================================
  353. if self.m_cfg.need_proxy:
  354. # 轮询代理
  355. proxy_lock_time = self.m_cfg.proxy_lock_interval
  356. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time)
  357. if not proxy:
  358. try:
  359. if task_ref:
  360. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  361. return None
  362. except Exception as e:
  363. self._log(f"Return Order task to queue exception, e={e}")
  364. plg_cfg.proxy.id = proxy["id"]
  365. plg_cfg.proxy.ip = proxy["ip"]
  366. plg_cfg.proxy.port = proxy["port"]
  367. plg_cfg.proxy.scheme = proxy["scheme"]
  368. plg_cfg.proxy.username = proxy.get("username", "")
  369. plg_cfg.proxy.password = proxy.get("password", "")
  370. plg_cfg.proxy.lock_until = proxy.get("lock_until", 0)
  371. return plg_cfg, task_ref
  372. def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None):
  373. """
  374. @brief 异步创建工作线程
  375. """
  376. instance = None
  377. creation_success = False
  378. try:
  379. # 1. 耗时操作:实例化 & 登录
  380. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  381. instance.set_log(self.m_logger)
  382. instance.set_config(plg_cfg)
  383. instance.create_session() # 可能耗时很久
  384. # 2. 注册到任务列表
  385. with self.m_lock:
  386. # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑
  387. # 计算预定权限
  388. book_allowed = False
  389. if not self.m_cfg.account_bind_applicant:
  390. book_allowed = True
  391. elif self.m_cfg.account_bind_applicant and task_ref:
  392. book_allowed = True
  393. new_task = Task(
  394. instance=instance,
  395. qw_cfg=self.m_cfg.query_wait,
  396. next_run=time.time(),
  397. task_ref=task_ref,
  398. book_allowed=book_allowed
  399. )
  400. self.m_tasks.append(new_task)
  401. creation_success = True
  402. p_type = "Order" if task_ref else "Built-in"
  403. self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===")
  404. except Exception as e:
  405. self._log(f"Creation failed for {plg_cfg.account.username}: {e}")
  406. finally:
  407. # -------------------------------------------------------------
  408. # 3. [绝对核心] 必须扣减 Pending
  409. # -------------------------------------------------------------
  410. # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放
  411. with self.m_lock:
  412. if task_ref:
  413. self.m_pending_orders -= 1
  414. if self.m_pending_orders < 0: self.m_pending_orders = 0
  415. else:
  416. self.m_pending_builtin -= 1
  417. if self.m_pending_builtin < 0: self.m_pending_builtin = 0
  418. # 4. 失败回滚
  419. if not creation_success:
  420. # 如果是云端订单,必须归还,否则丢单
  421. if task_ref:
  422. self._log(f"Rolling back task {task_ref['id']}")
  423. try:
  424. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  425. except:
  426. pass
  427. def _on_query_result(self, t: Task, query_result: VSQueryResult):
  428. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  429. if not t.book_allowed:
  430. return
  431. # -------------------------------------------------------
  432. # 1. 准备任务数据 (Data Preparation)
  433. # -------------------------------------------------------
  434. task_data = t.task_ref
  435. is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚)
  436. # 如果没有绑定本地任务,尝试从云端 Pop
  437. if not task_data:
  438. apt_type = query_result.apt_type
  439. booking_routing_key = f'auto.{apt_type.routing_key}' if apt_type.routing_key else "default"
  440. task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  441. if not task_data:
  442. # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃
  443. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  444. return
  445. is_cloud_task = True
  446. self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...")
  447. # 统一提取核心参数
  448. task_id = task_data['id']
  449. order_id = task_data.get('order_id')
  450. user_input = task_data.get('user_inputs', {})
  451. # -------------------------------------------------------
  452. # 2. 执行预订 (Execution)
  453. # -------------------------------------------------------
  454. booking_success = False
  455. try:
  456. # 统一调用,无需区分来源
  457. book_res = t.instance.book(query_result, user_input)
  458. # -------------------------------------------------------
  459. # 3. 成功处理 (Success Handling)
  460. # -------------------------------------------------------
  461. if book_res.success:
  462. booking_success = True
  463. self._log(f"✅ Booking SUCCESS! Order: {order_id}")
  464. current_grab_info = {
  465. "account": book_res.account,
  466. "session_id": book_res.session_id,
  467. "urn": book_res.urn,
  468. "slot_date": book_res.book_date,
  469. "slot_time": book_res.book_time,
  470. "timestamp": int(time.time()),
  471. "payment_link": book_res.payment_link,
  472. }
  473. update_data = {
  474. "status": "grabbed",
  475. "grabbed_history": current_grab_info
  476. }
  477. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  478. self._log(f"Task {task_id} marked as GRABBED.")
  479. else:
  480. self._log(f"❌ Booking Failed for Order {order_id}: {book_res.message}")
  481. except Exception as e:
  482. self._log(f"Exception during booking for Order {order_id}: {e}")
  483. finally:
  484. # -------------------------------------------------------
  485. # 4. 回滚机制 (Rollback / Return to Queue)
  486. # -------------------------------------------------------
  487. # 只有两个条件同时满足才回滚:
  488. # 1. 任务来自云端队列 (is_cloud_task is True)
  489. # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常
  490. if is_cloud_task and not booking_success:
  491. self._log(f"Returning Task {task_id} to queue (status=pending).")
  492. try:
  493. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  494. except Exception as ex:
  495. self._log(f"Failed to return task to queue: {ex}")