gco.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from datetime import datetime, timezone
  8. from typing import List, Dict, Tuple, Any, Optional, Callable
  9. from concurrent.futures import wait
  10. # 导入所有依赖
  11. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.account_manager import AccountManager
  14. from toolkit.proxy_manager import ProxyManager
  15. from toolkit.thread_pool import ThreadPool
  16. from toolkit.vs_cloud_api import VSCloudApi
  17. class GCO:
  18. """
  19. @brief GCO 类
  20. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  21. 任务调度、查询和预订流程。
  22. """
  23. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  24. self.m_cfg = cfg
  25. self.m_factory = VSPlgFactory() # 插件工厂实例
  26. self.m_logger = logger
  27. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  28. self.m_stop_event = threading.Event() # 用于停止线程
  29. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  30. self.m_monitor_thread: Optional[threading.Thread] = None
  31. self.m_creator_thread: Optional[threading.Thread] = None
  32. self.m_pending_builtin = 0 # 正在创建中的内置账号数量
  33. self.m_pending_orders = 0 # 正在创建中的订单账号数量
  34. def start(self):
  35. """
  36. @brief 启动协调器,包括插件注册和线程启动。
  37. """
  38. if not self.m_cfg.enable:
  39. self._log("Group is disabled, not starting.")
  40. return
  41. self._log("Starting coordinator...")
  42. self.m_stop_event.clear()
  43. # 注册插件
  44. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  45. # === 修复点:更智能的类名推导逻辑 ===
  46. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  47. plugin_name = self.m_cfg.plugin_config.plugin_name
  48. class_name = "".join(part.title() for part in plugin_name.split('_'))
  49. # 调试日志:确认推导出的类名
  50. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  51. self.m_factory.register_plugin(plugin_name,
  52. plugin_module_path,
  53. class_name)
  54. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  55. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  56. self.m_monitor_thread.start()
  57. self.m_creator_thread.start()
  58. self._log("Coordinator threads started.")
  59. def stop(self):
  60. """
  61. @brief 停止协调器,等待所有线程结束。
  62. """
  63. self._log("Stopping coordinator...")
  64. self.m_stop_event.set() # 发送停止信号
  65. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  66. self.m_monitor_thread.join()
  67. if self.m_creator_thread and self.m_creator_thread.is_alive():
  68. self.m_creator_thread.join()
  69. self._log("Coordinator stopped.")
  70. def group_id(self) -> str:
  71. """
  72. @brief 获取分组ID。
  73. """
  74. return self.m_cfg.identifier
  75. def _log(self, message):
  76. if self.m_logger:
  77. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  78. def _monitor_loop(self):
  79. """
  80. @brief 监控循环:定期检查实例健康状况,执行查询任务。
  81. 一旦发现号源,立即触发所有实例进行批量预订。
  82. """
  83. self._log("[START] monitor loop starting...")
  84. rng = random.Random()
  85. # 创建一个临时线程池用于并发抢票,避免阻塞监控循环太久
  86. # max_workers 根据你的最大并发账号数调整,或者设为 None (默认 CPU核心数 * 5)
  87. while not self.m_stop_event.is_set():
  88. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  89. time.sleep(sleep_ms)
  90. now = time.time()
  91. # 1. 拷贝任务列表 (Snapshot)
  92. tasks_to_process = []
  93. with self.m_lock:
  94. tasks_to_process = list(self.m_tasks)
  95. # 标记本轮循环是否触发了批量抢票
  96. batch_booking_triggered = False
  97. for task in tasks_to_process:
  98. # 二次检查任务是否还在列表中(防止在遍历过程中被移除)
  99. with self.m_lock:
  100. if task not in self.m_tasks:
  101. continue
  102. # 健康检查
  103. if not task.instance.health_check():
  104. continue
  105. # 检查时间窗口
  106. if now < task.next_run:
  107. continue
  108. # === 执行查询 ===
  109. apt_type = None
  110. try:
  111. apt_types = self.m_cfg.appointment_types
  112. if not apt_types:
  113. self._log(f"No matching appointment configuration found.")
  114. continue
  115. weights = [float(item.weight) for item in apt_types]
  116. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  117. VSCloudApi.Instance().slot_refresh_start(
  118. apt_type.routing_key,
  119. country=apt_type.country,
  120. city=apt_type.city,
  121. visa_type=apt_type.visa_type
  122. )
  123. # 这里的 task 充当了“哨兵”的角色
  124. result = task.instance.query(apt_type)
  125. VSCloudApi.Instance().slot_refresh_success(
  126. apt_type.routing_key
  127. )
  128. if result.success:
  129. self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING for {len(tasks_to_process)} workers.")
  130. query_payload = result.to_snapshot_payload()
  131. query_payload["website"] = self.m_cfg.website
  132. query_payload["snapshot_source"] = 'worker'
  133. query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
  134. VSCloudApi.Instance().slot_snapshot_report(query_payload)
  135. # === [核心修改]:一人发现,全员出击 ===
  136. # 1. 准备并发任务
  137. # 我们使用刚刚快照的 tasks_to_process,或者重新获取一次全量列表
  138. # 重点:所有实例 (w.instance) 都使用同一份查询结果 (result) 去抢
  139. futures = []
  140. for worker in tasks_to_process:
  141. # 提交到线程池并发执行,极大减少时间差
  142. f = ThreadPool.getInstance().enqueue(
  143. self._on_query_result,
  144. worker,
  145. result
  146. )
  147. futures.append(f)
  148. # 2. 等待所有抢票任务结束 (可选,根据业务需求决定是否阻塞监控)
  149. # 如果不wait,监控线程会立刻继续,可能导致重复触发
  150. # 建议 wait,确保这一波抢票彻底结束
  151. for f in futures:
  152. try:
  153. f.result() # 获取结果,捕获异常
  154. except Exception as e:
  155. self._log(f"Batch booking exception: {e}")
  156. # 3. 标记触发状态
  157. batch_booking_triggered = True
  158. break
  159. else:
  160. # 没查到,仅当前 task 记录日志
  161. self._log(f"Query done by {task.instance.get_group_id()}, No availability")
  162. except Exception as e:
  163. self._log(f"Exception during query: {e}")
  164. if apt_type:
  165. VSCloudApi.Instance().slot_refresh_fail(
  166. apt_type.routing_key,
  167. error=str(e)
  168. )
  169. # === 计算下次运行时间 (仅针对当前 query 的 task,除非触发了 batch) ===
  170. if not batch_booking_triggered:
  171. interval = 30
  172. mode = task.qw_cfg.mode
  173. if mode == QueryWaitMode.Loop:
  174. interval = 0
  175. elif mode == QueryWaitMode.Fixed:
  176. interval = task.qw_cfg.fixed_wait
  177. elif mode == QueryWaitMode.Random:
  178. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  179. task.next_run = time.time() + interval
  180. # === [修正后的批量冷却逻辑] ===
  181. # 抢票结束后,必须严格按照每个账号配置的频率进入冷却,防止集体429
  182. if batch_booking_triggered:
  183. self._log(f"Batch booking finished. Resetting wait times based on configurations.")
  184. # 重新获取当前时间(因为抢票过程消耗了时间)
  185. now_ts = time.time()
  186. with self.m_lock:
  187. for t in self.m_tasks:
  188. # 重新读取该任务的配置
  189. interval = 30 # 默认兜底
  190. mode = t.qw_cfg.mode
  191. if mode == QueryWaitMode.Loop:
  192. # 即使是 Loop 模式,在大规模抢票后建议给一个微小的缓冲(如1秒),避免死循环导致 CPU 飙升
  193. # 如果你的逻辑允许立刻重试,这里可以是 0
  194. interval = 1
  195. elif mode == QueryWaitMode.Fixed:
  196. interval = t.qw_cfg.fixed_wait
  197. elif mode == QueryWaitMode.Random:
  198. interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
  199. # 设置下次运行时间
  200. t.next_run = now_ts + interval
  201. self._log(f"All workers cooldown reset. Resuming monitor loop.")
  202. # 清理不健康实例
  203. with self.m_lock:
  204. initial_size = len(self.m_tasks)
  205. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  206. if len(self.m_tasks) < initial_size:
  207. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  208. self._log("[STOP] monitor loop exiting...")
  209. def _creator_loop(self):
  210. """
  211. @brief 创建者循环:双轨制并发控制 + Pending 计数防超发
  212. """
  213. self._log("[START] creator loop starting...")
  214. while not self.m_stop_event.is_set():
  215. time.sleep(1.0) # 保持 1秒间隔,给资源管理器缓冲时间
  216. # -------------------------------------------------------------
  217. # 1. 统计当前状态 (Active + Pending)
  218. # -------------------------------------------------------------
  219. current_builtin = 0
  220. current_orders = 0
  221. with self.m_lock:
  222. for t in self.m_tasks:
  223. if t.task_ref:
  224. current_orders += 1
  225. else:
  226. current_builtin += 1
  227. # 读取正在创建中的计数 (快照)
  228. pending_builtin = self.m_pending_builtin
  229. pending_orders = self.m_pending_orders
  230. # -------------------------------------------------------------
  231. # 2. 计算缺口
  232. # -------------------------------------------------------------
  233. # === [内置账号缺口] ===
  234. needed_builtin = 0
  235. # 逻辑:只要目标数量 > 0,就尝试补充 (兼容需要账号和不需要账号的模式)
  236. if self.m_cfg.target_instances > 0:
  237. total_builtin_proj = current_builtin + pending_builtin
  238. needed_builtin = self.m_cfg.target_instances - total_builtin_proj
  239. # === [订单账号缺口] ===
  240. needed_order = 0
  241. # 逻辑:必须需要账号(need_account=True) 且 限制数量大于 0
  242. # (order_account_enable 已移除,直接看 limit)
  243. if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
  244. total_order_proj = current_orders + pending_orders
  245. needed_order = self.m_cfg.order_account_online_limit - total_order_proj
  246. # 如果两边都满了,跳过本轮
  247. if needed_builtin <= 0 and needed_order <= 0:
  248. continue
  249. # -------------------------------------------------------------
  250. # 3. 准备配置
  251. # -------------------------------------------------------------
  252. config_data = self._prepare_next_config(
  253. need_builtin=(needed_builtin > 0),
  254. need_order=(needed_order > 0),
  255. )
  256. if not config_data:
  257. time.sleep(10.0)
  258. continue
  259. plg_cfg, task_ref = config_data
  260. # -------------------------------------------------------------
  261. # 4. [绝对核心] 提交前立即增加 Pending 计数
  262. # -------------------------------------------------------------
  263. # 必须在这里加!防止下一秒循环重复创建!
  264. with self.m_lock:
  265. if task_ref:
  266. self.m_pending_orders += 1
  267. else:
  268. self.m_pending_builtin += 1
  269. p_type = "Order" if task_ref else "Built-in"
  270. self._log(f"+++ Spawning {p_type} (Pending: {self.m_pending_builtin}/{self.m_pending_orders})...")
  271. # 5. 异步提交
  272. try:
  273. ThreadPool.getInstance().enqueue(
  274. self._create_and_register_plg_worker,
  275. plg_cfg,
  276. task_ref
  277. )
  278. except Exception as e:
  279. # 提交失败回滚计数
  280. self._log(f"Failed to enqueue task: {e}")
  281. with self.m_lock:
  282. if task_ref: self.m_pending_orders -= 1
  283. else: self.m_pending_builtin -= 1
  284. # 错开并发
  285. time.sleep(random.uniform(0.5, 1.0))
  286. self._log("[STOP] creator loop exiting...")
  287. def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]:
  288. """
  289. @brief 准备下一个插件实例的配置
  290. """
  291. plg_cfg = VSPlgConfig()
  292. plg_cfg.debug = self.m_cfg.debug
  293. plg_cfg.free_config = self.m_cfg.free_config
  294. plg_cfg.session_max_life = self.m_cfg.session_max_life
  295. task_ref = None
  296. config_ready = False
  297. pool_name = self.m_cfg.local_account_pool
  298. # =================================================================
  299. # 1. 账号获取
  300. # =================================================================
  301. if not self.m_cfg.need_account:
  302. # === 游客模式 (无需账号) ===
  303. if need_builtin:
  304. plg_cfg.account.id = 0
  305. plg_cfg.account.username = "Guest"
  306. config_ready = True
  307. task_ref = None
  308. else:
  309. # === 标准模式 (需要账号) ===
  310. # A. 优先补充内置账号 (只要 target_instances 还有缺口)
  311. if need_builtin:
  312. # 获取并锁定账号
  313. account = AccountManager.Instance().next(
  314. pool_name,
  315. lock_duration=self.m_cfg.account_login_interval * 60
  316. )
  317. if account:
  318. plg_cfg.account.id = account["id"]
  319. plg_cfg.account.username = account["username"]
  320. plg_cfg.account.password = account["password"]
  321. plg_cfg.account.lock_until = account.get("lock_until", 0)
  322. config_ready = True
  323. task_ref = None
  324. self._log(f"Selected Built-in: {plg_cfg.account.username}")
  325. # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
  326. # 只有当 limit > 0 时才尝试
  327. if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0:
  328. try:
  329. routing_key = self.m_cfg.order_account_routing
  330. task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key)
  331. if task_ref:
  332. user_inputs = task_ref.get('user_inputs', {})
  333. plg_cfg.account.id = 0 # 临时账号
  334. plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "")
  335. plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "")
  336. if plg_cfg.account.username:
  337. config_ready = True
  338. self._log(f"Selected Order Acc: {plg_cfg.account.username}")
  339. else:
  340. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  341. return None
  342. except Exception as e:
  343. pass
  344. #self._log(f"Get Order task exception, e={e}")
  345. if not config_ready:
  346. return None
  347. # =================================================================
  348. # 2. 代理配置
  349. # =================================================================
  350. if self.m_cfg.need_proxy:
  351. # 轮询代理
  352. proxy_lock_time = self.m_cfg.proxy_lock_interval
  353. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time)
  354. if not proxy:
  355. try:
  356. if task_ref:
  357. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  358. return None
  359. except Exception as e:
  360. self._log(f"Return Order task to queue exception, e={e}")
  361. plg_cfg.proxy.id = proxy["id"]
  362. plg_cfg.proxy.ip = proxy["ip"]
  363. plg_cfg.proxy.port = proxy["port"]
  364. plg_cfg.proxy.scheme = proxy["scheme"]
  365. plg_cfg.proxy.username = proxy.get("username", "")
  366. plg_cfg.proxy.password = proxy.get("password", "")
  367. plg_cfg.proxy.lock_until = proxy.get("lock_until", 0)
  368. return plg_cfg, task_ref
  369. def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None):
  370. """
  371. @brief 异步创建工作线程
  372. """
  373. instance = None
  374. creation_success = False
  375. try:
  376. # 1. 耗时操作:实例化 & 登录
  377. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  378. instance.set_log(self.m_logger)
  379. instance.set_config(plg_cfg)
  380. instance.create_session() # 可能耗时很久
  381. # 2. 注册到任务列表
  382. with self.m_lock:
  383. # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑
  384. # 计算预定权限
  385. book_allowed = False
  386. if not self.m_cfg.account_bind_applicant:
  387. book_allowed = True
  388. elif self.m_cfg.account_bind_applicant and task_ref:
  389. book_allowed = True
  390. new_task = Task(
  391. instance=instance,
  392. qw_cfg=self.m_cfg.query_wait,
  393. next_run=time.time(),
  394. task_ref=task_ref,
  395. book_allowed=book_allowed
  396. )
  397. self.m_tasks.append(new_task)
  398. creation_success = True
  399. p_type = "Order" if task_ref else "Built-in"
  400. self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===")
  401. except Exception as e:
  402. self._log(f"Creation failed for {plg_cfg.account.username}: {e}")
  403. finally:
  404. # -------------------------------------------------------------
  405. # 3. [绝对核心] 必须扣减 Pending
  406. # -------------------------------------------------------------
  407. # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放
  408. with self.m_lock:
  409. if task_ref:
  410. self.m_pending_orders -= 1
  411. if self.m_pending_orders < 0: self.m_pending_orders = 0
  412. else:
  413. self.m_pending_builtin -= 1
  414. if self.m_pending_builtin < 0: self.m_pending_builtin = 0
  415. # 4. 失败回滚
  416. if not creation_success:
  417. # 如果是云端订单,必须归还,否则丢单
  418. if task_ref:
  419. self._log(f"Rolling back task {task_ref['id']}")
  420. try:
  421. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  422. except:
  423. pass
  424. def _on_query_result(self, t: Task, query_result: VSQueryResult):
  425. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  426. if not t.book_allowed:
  427. return
  428. # -------------------------------------------------------
  429. # 1. 准备任务数据 (Data Preparation)
  430. # -------------------------------------------------------
  431. task_data = t.task_ref
  432. is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚)
  433. # 如果没有绑定本地任务,尝试从云端 Pop
  434. if not task_data:
  435. apt_type = query_result.apt_type
  436. booking_routing_key = f'auto.{apt_type.routing_key}' if apt_type.routing_key else "default"
  437. task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  438. if not task_data:
  439. # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃
  440. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  441. return
  442. is_cloud_task = True
  443. self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...")
  444. # 统一提取核心参数
  445. task_id = task_data['id']
  446. order_id = task_data.get('order_id')
  447. user_input = task_data.get('user_inputs', {})
  448. # -------------------------------------------------------
  449. # 2. 执行预订 (Execution)
  450. # -------------------------------------------------------
  451. booking_success = False
  452. try:
  453. # 统一调用,无需区分来源
  454. book_res = t.instance.book(query_result, user_input)
  455. # -------------------------------------------------------
  456. # 3. 成功处理 (Success Handling)
  457. # -------------------------------------------------------
  458. if book_res.success:
  459. booking_success = True
  460. self._log(f"✅ Booking SUCCESS! Order: {order_id}")
  461. current_grab_info = {
  462. "account": book_res.account,
  463. "session_id": book_res.session_id,
  464. "urn": book_res.urn,
  465. "slot_date": book_res.book_date,
  466. "slot_time": book_res.book_time,
  467. "timestamp": int(time.time()),
  468. "payment_link": book_res.payment_link,
  469. }
  470. update_data = {
  471. "status": "grabbed",
  472. "grabbed_history": current_grab_info
  473. }
  474. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  475. self._log(f"Task {task_id} marked as GRABBED.")
  476. else:
  477. self._log(f"❌ Booking Failed for Order {order_id}: {book_res.message}")
  478. except Exception as e:
  479. self._log(f"Exception during booking for Order {order_id}: {e}")
  480. finally:
  481. # -------------------------------------------------------
  482. # 4. 回滚机制 (Rollback / Return to Queue)
  483. # -------------------------------------------------------
  484. # 只有两个条件同时满足才回滚:
  485. # 1. 任务来自云端队列 (is_cloud_task is True)
  486. # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常
  487. if is_cloud_task and not booking_success:
  488. self._log(f"Returning Task {task_id} to queue (status=pending).")
  489. try:
  490. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  491. except Exception as ex:
  492. self._log(f"Failed to return task to queue: {ex}")