gco.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from datetime import datetime, timezone
  8. from typing import List, Dict, Tuple, Any, Optional, Callable
  9. from concurrent.futures import wait
  10. # 导入所有依赖
  11. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.account_manager import AccountManager
  14. from toolkit.proxy_manager import ProxyManager
  15. from toolkit.thread_pool import ThreadPool
  16. from toolkit.vs_cloud_api import VSCloudApi
  17. import traceback
  18. class GCO:
  19. """
  20. @brief GCO 类
  21. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  22. 任务调度、查询和预订流程。
  23. """
  24. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  25. self.m_cfg = cfg
  26. self.m_factory = VSPlgFactory() # 插件工厂实例
  27. self.m_logger = logger
  28. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  29. self.m_stop_event = threading.Event() # 用于停止线程
  30. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  31. self.m_monitor_thread: Optional[threading.Thread] = None
  32. self.m_creator_thread: Optional[threading.Thread] = None
  33. self.m_pending_builtin = 0 # 正在创建中的内置账号数量
  34. self.m_pending_orders = 0 # 正在创建中的订单账号数量
  35. def start(self):
  36. """
  37. @brief 启动协调器,包括插件注册和线程启动。
  38. """
  39. if not self.m_cfg.enable:
  40. self._log("Group is disabled, not starting.")
  41. return
  42. self._log("Starting coordinator...")
  43. self.m_stop_event.clear()
  44. # 注册插件
  45. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  46. # === 修复点:更智能的类名推导逻辑 ===
  47. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  48. plugin_name = self.m_cfg.plugin_config.plugin_name
  49. class_name = "".join(part.title() for part in plugin_name.split('_'))
  50. # 调试日志:确认推导出的类名
  51. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  52. self.m_factory.register_plugin(plugin_name,
  53. plugin_module_path,
  54. class_name)
  55. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  56. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  57. self.m_monitor_thread.start()
  58. self.m_creator_thread.start()
  59. self._log("Coordinator threads started.")
  60. def stop(self):
  61. """
  62. @brief 停止协调器,等待所有线程结束。
  63. """
  64. self._log("Stopping coordinator...")
  65. self.m_stop_event.set() # 发送停止信号
  66. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  67. self.m_monitor_thread.join()
  68. if self.m_creator_thread and self.m_creator_thread.is_alive():
  69. self.m_creator_thread.join()
  70. self._log("Coordinator stopped.")
  71. def group_id(self) -> str:
  72. """
  73. @brief 获取分组ID。
  74. """
  75. return self.m_cfg.identifier
  76. def _log(self, message):
  77. if self.m_logger:
  78. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  79. def _monitor_loop(self):
  80. """
  81. @brief 监控循环:定期检查实例健康状况,执行查询任务。
  82. 一旦发现号源,立即触发所有实例进行批量预订。
  83. """
  84. self._log("[START] monitor loop starting...")
  85. rng = random.Random()
  86. # 创建一个临时线程池用于并发抢票,避免阻塞监控循环太久
  87. # max_workers 根据你的最大并发账号数调整,或者设为 None (默认 CPU核心数 * 5)
  88. while not self.m_stop_event.is_set():
  89. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  90. time.sleep(sleep_ms)
  91. now = time.time()
  92. # 1. 拷贝任务列表 (Snapshot)
  93. tasks_to_process = []
  94. with self.m_lock:
  95. tasks_to_process = list(self.m_tasks)
  96. # 标记本轮循环是否触发了批量抢票
  97. batch_booking_triggered = False
  98. for task in tasks_to_process:
  99. # 二次检查任务是否还在列表中(防止在遍历过程中被移除)
  100. with self.m_lock:
  101. if task not in self.m_tasks:
  102. continue
  103. # 健康检查
  104. if not task.instance.health_check():
  105. continue
  106. # 检查时间窗口
  107. if now < task.next_run:
  108. continue
  109. # === 执行查询 ===
  110. apt_type = None
  111. try:
  112. apt_types = self.m_cfg.appointment_types
  113. if not apt_types:
  114. self._log(f"No matching appointment configuration found.")
  115. continue
  116. weights = [float(item.weight) for item in apt_types]
  117. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  118. VSCloudApi.Instance().slot_refresh_start(
  119. apt_type.routing_key,
  120. country=apt_type.country,
  121. city=apt_type.city,
  122. visa_type=apt_type.visa_type
  123. )
  124. # 这里的 task 充当了“哨兵”的角色
  125. result = task.instance.query(apt_type)
  126. result.apt_type = apt_type
  127. VSCloudApi.Instance().slot_refresh_success(
  128. apt_type.routing_key
  129. )
  130. if result.success:
  131. self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING for {len(tasks_to_process)} workers.")
  132. # 上报Slot Snapshot
  133. # query_payload = result.to_snapshot_payload()
  134. # query_payload["website"] = self.m_cfg.website
  135. # query_payload["snapshot_source"] = 'worker'
  136. # query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
  137. # VSCloudApi.Instance().slot_snapshot_report(query_payload)
  138. # === [核心修改]:一人发现,全员出击 ===
  139. # 1. 准备并发任务
  140. # 我们使用刚刚快照的 tasks_to_process,或者重新获取一次全量列表
  141. # 重点:所有实例 (w.instance) 都使用同一份查询结果 (result) 去抢
  142. futures = []
  143. for worker in tasks_to_process:
  144. # 提交到线程池并发执行,极大减少时间差
  145. f = ThreadPool.getInstance().enqueue(
  146. self._on_query_result,
  147. worker,
  148. result
  149. )
  150. futures.append(f)
  151. # 2. 等待所有抢票任务结束 (可选,根据业务需求决定是否阻塞监控)
  152. # 如果不wait,监控线程会立刻继续,可能导致重复触发
  153. # 建议 wait,确保这一波抢票彻底结束
  154. for f in futures:
  155. try:
  156. f.result() # 获取结果,捕获异常
  157. except Exception as e:
  158. self._log(f"Batch booking exception: {e}")
  159. # 3. 标记触发状态
  160. batch_booking_triggered = True
  161. break
  162. else:
  163. # 没查到,仅当前 task 记录日志
  164. self._log(f"Query done by {task.instance.get_group_id()}, No availability")
  165. except Exception as e:
  166. traceback.print_exc()
  167. self._log(f"Exception during query: {e}")
  168. if apt_type:
  169. VSCloudApi.Instance().slot_refresh_fail(
  170. apt_type.routing_key,
  171. error=str(e)
  172. )
  173. # === 计算下次运行时间 (仅针对当前 query 的 task,除非触发了 batch) ===
  174. if not batch_booking_triggered:
  175. interval = 30
  176. mode = task.qw_cfg.mode
  177. if mode == QueryWaitMode.Loop:
  178. interval = 0
  179. elif mode == QueryWaitMode.Fixed:
  180. interval = task.qw_cfg.fixed_wait
  181. elif mode == QueryWaitMode.Random:
  182. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  183. task.next_run = time.time() + interval
  184. # === [修正后的批量冷却逻辑] ===
  185. # 抢票结束后,必须严格按照每个账号配置的频率进入冷却,防止集体429
  186. if batch_booking_triggered:
  187. self._log(f"Batch booking finished. Resetting wait times based on configurations.")
  188. # 重新获取当前时间(因为抢票过程消耗了时间)
  189. now_ts = time.time()
  190. with self.m_lock:
  191. for t in self.m_tasks:
  192. # 重新读取该任务的配置
  193. interval = 30 # 默认兜底
  194. mode = t.qw_cfg.mode
  195. if mode == QueryWaitMode.Loop:
  196. # 即使是 Loop 模式,在大规模抢票后建议给一个微小的缓冲(如1秒),避免死循环导致 CPU 飙升
  197. # 如果你的逻辑允许立刻重试,这里可以是 0
  198. interval = 1
  199. elif mode == QueryWaitMode.Fixed:
  200. interval = t.qw_cfg.fixed_wait
  201. elif mode == QueryWaitMode.Random:
  202. interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
  203. # 设置下次运行时间
  204. t.next_run = now_ts + interval
  205. self._log(f"All workers cooldown reset. Resuming monitor loop.")
  206. # 清理不健康实例
  207. with self.m_lock:
  208. initial_size = len(self.m_tasks)
  209. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  210. if len(self.m_tasks) < initial_size:
  211. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  212. self._log("[STOP] monitor loop exiting...")
  213. def _creator_loop(self):
  214. """
  215. @brief 创建者循环:双轨制并发控制 + Pending 计数防超发
  216. """
  217. self._log("[START] creator loop starting...")
  218. while not self.m_stop_event.is_set():
  219. time.sleep(1.0) # 保持 1秒间隔,给资源管理器缓冲时间
  220. # -------------------------------------------------------------
  221. # 1. 统计当前状态 (Active + Pending)
  222. # -------------------------------------------------------------
  223. current_builtin = 0
  224. current_orders = 0
  225. with self.m_lock:
  226. for t in self.m_tasks:
  227. if t.task_ref:
  228. current_orders += 1
  229. else:
  230. current_builtin += 1
  231. # 读取正在创建中的计数 (快照)
  232. pending_builtin = self.m_pending_builtin
  233. pending_orders = self.m_pending_orders
  234. # -------------------------------------------------------------
  235. # 2. 计算缺口
  236. # -------------------------------------------------------------
  237. # === [内置账号缺口] ===
  238. needed_builtin = 0
  239. # 逻辑:只要目标数量 > 0,就尝试补充 (兼容需要账号和不需要账号的模式)
  240. if self.m_cfg.target_instances > 0:
  241. total_builtin_proj = current_builtin + pending_builtin
  242. needed_builtin = self.m_cfg.target_instances - total_builtin_proj
  243. # === [订单账号缺口] ===
  244. needed_order = 0
  245. # 逻辑:必须需要账号(need_account=True) 且 限制数量大于 0
  246. # (order_account_enable 已移除,直接看 limit)
  247. if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
  248. total_order_proj = current_orders + pending_orders
  249. needed_order = self.m_cfg.order_account_online_limit - total_order_proj
  250. # 如果两边都满了,跳过本轮
  251. if needed_builtin <= 0 and needed_order <= 0:
  252. continue
  253. # -------------------------------------------------------------
  254. # 3. 准备配置
  255. # -------------------------------------------------------------
  256. config_data = self._prepare_next_config(
  257. need_builtin=(needed_builtin > 0),
  258. need_order=(needed_order > 0),
  259. )
  260. if not config_data:
  261. time.sleep(10.0)
  262. continue
  263. plg_cfg, task_ref = config_data
  264. # -------------------------------------------------------------
  265. # 4. [绝对核心] 提交前立即增加 Pending 计数
  266. # -------------------------------------------------------------
  267. # 必须在这里加!防止下一秒循环重复创建!
  268. with self.m_lock:
  269. if task_ref:
  270. self.m_pending_orders += 1
  271. else:
  272. self.m_pending_builtin += 1
  273. p_type = "Order" if task_ref else "Built-in"
  274. self._log(f"+++ Spawning {p_type} (Pending: {self.m_pending_builtin}/{self.m_pending_orders})...")
  275. # 5. 异步提交
  276. try:
  277. ThreadPool.getInstance().enqueue(
  278. self._create_and_register_plg_worker,
  279. plg_cfg,
  280. task_ref
  281. )
  282. except Exception as e:
  283. # 提交失败回滚计数
  284. self._log(f"Failed to enqueue task: {e}")
  285. with self.m_lock:
  286. if task_ref: self.m_pending_orders -= 1
  287. else: self.m_pending_builtin -= 1
  288. # 错开并发
  289. time.sleep(random.uniform(0.5, 1.0))
  290. self._log("[STOP] creator loop exiting...")
  291. def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]:
  292. """
  293. @brief 准备下一个插件实例的配置
  294. """
  295. plg_cfg = VSPlgConfig()
  296. plg_cfg.debug = self.m_cfg.debug
  297. plg_cfg.free_config = self.m_cfg.free_config
  298. plg_cfg.session_max_life = self.m_cfg.session_max_life
  299. task_ref = None
  300. config_ready = False
  301. pool_name = self.m_cfg.local_account_pool
  302. # =================================================================
  303. # 1. 账号获取
  304. # =================================================================
  305. if not self.m_cfg.need_account:
  306. # === 游客模式 (无需账号) ===
  307. if need_builtin:
  308. plg_cfg.account.id = 0
  309. plg_cfg.account.username = "Guest"
  310. config_ready = True
  311. task_ref = None
  312. else:
  313. # === 标准模式 (需要账号) ===
  314. # A. 优先补充内置账号 (只要 target_instances 还有缺口)
  315. if need_builtin:
  316. # 获取并锁定账号
  317. account = AccountManager.Instance().next(
  318. pool_name,
  319. lock_duration=self.m_cfg.account_login_interval * 60
  320. )
  321. if account:
  322. plg_cfg.account.id = account["id"]
  323. plg_cfg.account.username = account["username"]
  324. plg_cfg.account.password = account["password"]
  325. plg_cfg.account.lock_until = account.get("lock_until", 0)
  326. config_ready = True
  327. task_ref = None
  328. self._log(f"Selected Built-in: {plg_cfg.account.username}")
  329. # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
  330. # 只有当 limit > 0 时才尝试
  331. if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0:
  332. try:
  333. routing_key = self.m_cfg.order_account_routing
  334. task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key)
  335. if task_ref:
  336. user_inputs = task_ref.get('user_inputs', {})
  337. plg_cfg.account.id = 0 # 临时账号
  338. plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "")
  339. plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "")
  340. if plg_cfg.account.username:
  341. config_ready = True
  342. self._log(f"Selected Order Acc: {plg_cfg.account.username}")
  343. else:
  344. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  345. return None
  346. except Exception as e:
  347. pass
  348. #self._log(f"Get Order task exception, e={e}")
  349. if not config_ready:
  350. return None
  351. # =================================================================
  352. # 2. 代理配置
  353. # =================================================================
  354. if self.m_cfg.need_proxy:
  355. # 轮询代理
  356. proxy_lock_time = self.m_cfg.proxy_lock_interval
  357. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time)
  358. if not proxy:
  359. try:
  360. if task_ref:
  361. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  362. return None
  363. except Exception as e:
  364. self._log(f"Return Order task to queue exception, e={e}")
  365. plg_cfg.proxy.id = proxy["id"]
  366. plg_cfg.proxy.ip = proxy["ip"]
  367. plg_cfg.proxy.port = proxy["port"]
  368. plg_cfg.proxy.scheme = proxy["scheme"]
  369. plg_cfg.proxy.username = proxy.get("username", "")
  370. plg_cfg.proxy.password = proxy.get("password", "")
  371. plg_cfg.proxy.lock_until = proxy.get("lock_until", 0)
  372. return plg_cfg, task_ref
  373. def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None):
  374. """
  375. @brief 异步创建工作线程
  376. """
  377. instance = None
  378. creation_success = False
  379. try:
  380. # 1. 耗时操作:实例化 & 登录
  381. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  382. instance.set_log(self.m_logger)
  383. instance.set_config(plg_cfg)
  384. instance.create_session() # 可能耗时很久
  385. # 2. 注册到任务列表
  386. with self.m_lock:
  387. # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑
  388. # 计算预定权限
  389. book_allowed = False
  390. if not self.m_cfg.account_bind_applicant:
  391. book_allowed = True
  392. elif self.m_cfg.account_bind_applicant and task_ref:
  393. book_allowed = True
  394. new_task = Task(
  395. instance=instance,
  396. qw_cfg=self.m_cfg.query_wait,
  397. next_run=time.time(),
  398. task_ref=task_ref,
  399. book_allowed=book_allowed
  400. )
  401. self.m_tasks.append(new_task)
  402. creation_success = True
  403. p_type = "Order" if task_ref else "Built-in"
  404. self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===")
  405. except Exception as e:
  406. self._log(f"Creation failed for {plg_cfg.account.username}: {e}")
  407. finally:
  408. # -------------------------------------------------------------
  409. # 3. [绝对核心] 必须扣减 Pending
  410. # -------------------------------------------------------------
  411. # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放
  412. with self.m_lock:
  413. if task_ref:
  414. self.m_pending_orders -= 1
  415. if self.m_pending_orders < 0: self.m_pending_orders = 0
  416. else:
  417. self.m_pending_builtin -= 1
  418. if self.m_pending_builtin < 0: self.m_pending_builtin = 0
  419. # 4. 失败回滚
  420. if not creation_success:
  421. # 如果是云端订单,必须归还,否则丢单
  422. if task_ref:
  423. self._log(f"Rolling back task {task_ref['id']}")
  424. try:
  425. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  426. except:
  427. pass
  428. def _on_query_result(self, t: Task, query_result: VSQueryResult):
  429. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  430. if not t.book_allowed:
  431. return
  432. # -------------------------------------------------------
  433. # 1. 准备任务数据 (Data Preparation)
  434. # -------------------------------------------------------
  435. task_data = t.task_ref
  436. is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚)
  437. # 如果没有绑定本地任务,尝试从云端 Pop
  438. if not task_data:
  439. apt_type = query_result.apt_type
  440. booking_routing_key = f'auto.{apt_type.routing_key}' if apt_type.routing_key else "default"
  441. task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  442. if not task_data:
  443. # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃
  444. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  445. return
  446. is_cloud_task = True
  447. self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...")
  448. # 统一提取核心参数
  449. task_id = task_data['id']
  450. order_id = task_data.get('order_id')
  451. user_input = task_data.get('user_inputs', {})
  452. # -------------------------------------------------------
  453. # 2. 执行预订 (Execution)
  454. # -------------------------------------------------------
  455. booking_success = False
  456. try:
  457. # 统一调用,无需区分来源
  458. book_res = t.instance.book(query_result, user_input)
  459. # -------------------------------------------------------
  460. # 3. 成功处理 (Success Handling)
  461. # -------------------------------------------------------
  462. if book_res.success:
  463. booking_success = True
  464. self._log(f"✅ Booking SUCCESS! Order: {order_id}")
  465. current_grab_info = {
  466. "account": book_res.account,
  467. "session_id": book_res.session_id,
  468. "urn": book_res.urn,
  469. "slot_date": book_res.book_date,
  470. "slot_time": book_res.book_time,
  471. "timestamp": int(time.time()),
  472. "payment_link": book_res.payment_link,
  473. }
  474. update_data = {
  475. "status": "grabbed",
  476. "grabbed_history": current_grab_info
  477. }
  478. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  479. self._log(f"Task {task_id} marked as GRABBED.")
  480. else:
  481. self._log(f"❌ Booking Failed for Order {order_id}: {book_res.message}")
  482. except Exception as e:
  483. self._log(f"Exception during booking for Order {order_id}: {e}")
  484. finally:
  485. # -------------------------------------------------------
  486. # 4. 回滚机制 (Rollback / Return to Queue)
  487. # -------------------------------------------------------
  488. # 只有两个条件同时满足才回滚:
  489. # 1. 任务来自云端队列 (is_cloud_task is True)
  490. # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常
  491. if is_cloud_task and not booking_success:
  492. self._log(f"Returning Task {task_id} to queue (status=pending).")
  493. try:
  494. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  495. except Exception as ex:
  496. self._log(f"Failed to return task to queue: {ex}")