gco.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. # gco.py
  2. import os
  3. import time
  4. import json
  5. import random
  6. import threading
  7. from datetime import datetime, timezone
  8. from typing import List, Dict, Tuple, Any, Optional, Callable
  9. from concurrent.futures import wait
  10. # 导入所有依赖
  11. from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task
  12. from vs_plg_factory import VSPlgFactory
  13. from toolkit.proxy_manager import ProxyManager
  14. from toolkit.thread_pool import ThreadPool
  15. from toolkit.vs_cloud_api import VSCloudApi
  16. import traceback
  17. class GCO:
  18. """
  19. @brief GCO 类
  20. 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
  21. 任务调度、查询和预订流程。
  22. """
  23. def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
  24. self.m_cfg = cfg
  25. self.m_factory = VSPlgFactory() # 插件工厂实例
  26. self.m_logger = logger
  27. self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
  28. self.m_stop_event = threading.Event() # 用于停止线程
  29. self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
  30. self.m_monitor_thread: Optional[threading.Thread] = None
  31. self.m_creator_thread: Optional[threading.Thread] = None
  32. self.m_pending_builtin = 0 # 正在创建中的内置账号数量
  33. self.m_pending_orders = 0 # 正在创建中的订单账号数量
  34. def start(self):
  35. """
  36. @brief 启动协调器,包括插件注册和线程启动。
  37. """
  38. if not self.m_cfg.enable:
  39. self._log("Group is disabled, not starting.")
  40. return
  41. self._log("Starting coordinator...")
  42. self.m_stop_event.clear()
  43. # 注册插件
  44. plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
  45. # === 修复点:更智能的类名推导逻辑 ===
  46. # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
  47. plugin_name = self.m_cfg.plugin_config.plugin_name
  48. class_name = "".join(part.title() for part in plugin_name.split('_'))
  49. # 调试日志:确认推导出的类名
  50. self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
  51. self.m_factory.register_plugin(plugin_name,
  52. plugin_module_path,
  53. class_name)
  54. self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
  55. self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
  56. self.m_monitor_thread.start()
  57. self.m_creator_thread.start()
  58. self._log("Coordinator threads started.")
  59. def stop(self):
  60. """
  61. @brief 停止协调器,等待所有线程结束。
  62. """
  63. self._log("Stopping coordinator...")
  64. self.m_stop_event.set() # 发送停止信号
  65. if self.m_monitor_thread and self.m_monitor_thread.is_alive():
  66. self.m_monitor_thread.join()
  67. if self.m_creator_thread and self.m_creator_thread.is_alive():
  68. self.m_creator_thread.join()
  69. self._log("Coordinator stopped.")
  70. def group_id(self) -> str:
  71. """
  72. @brief 获取分组ID。
  73. """
  74. return self.m_cfg.identifier
  75. def _log(self, message):
  76. if self.m_logger:
  77. self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
  78. def _monitor_loop(self):
  79. """
  80. @brief 监控循环:定期检查实例健康状况,执行查询任务。
  81. 一旦发现号源,立即触发所有实例进行批量预订。
  82. """
  83. self._log("[START] monitor loop starting...")
  84. rng = random.Random()
  85. while not self.m_stop_event.is_set():
  86. try:
  87. sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
  88. time.sleep(sleep_ms)
  89. now = time.time()
  90. tasks_to_process = []
  91. with self.m_lock:
  92. tasks_to_process = list(self.m_tasks)
  93. batch_booking_triggered = False
  94. for task in tasks_to_process:
  95. with self.m_lock:
  96. if task not in self.m_tasks:
  97. continue
  98. if not task.instance.health_check():
  99. continue
  100. if now < task.next_run:
  101. continue
  102. apt_type = None
  103. try:
  104. apt_types = self.m_cfg.appointment_types
  105. if not apt_types:
  106. self._log(f"No matching appointment configuration found.")
  107. continue
  108. weights = [float(item.weight) for item in apt_types]
  109. apt_type = random.choices(apt_types, weights=weights, k=1)[0]
  110. VSCloudApi.Instance().slot_refresh_start(
  111. apt_type.routing_key,
  112. country=apt_type.country,
  113. city=apt_type.city,
  114. visa_type=apt_type.visa_type
  115. )
  116. result = task.instance.query(apt_type)
  117. result.apt_type = apt_type
  118. VSCloudApi.Instance().slot_refresh_success(
  119. apt_type.routing_key
  120. )
  121. if result.success:
  122. self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING.")
  123. query_payload = result.to_snapshot_payload()
  124. query_payload["website"] = self.m_cfg.website
  125. query_payload["snapshot_source"] = 'worker'
  126. query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
  127. VSCloudApi.Instance().slot_snapshot_report(query_payload)
  128. futures = []
  129. for worker in tasks_to_process:
  130. f = ThreadPool.getInstance().enqueue(
  131. self._on_query_result,
  132. worker,
  133. result
  134. )
  135. futures.append(f)
  136. for f in futures:
  137. try:
  138. f.result()
  139. except Exception as e:
  140. pass
  141. batch_booking_triggered = True
  142. break
  143. else:
  144. self._log(f"Query done by {task.instance.get_group_id()}, No availability")
  145. except Exception as e:
  146. traceback.print_exc()
  147. self._log(f"Exception during query: {e}")
  148. if apt_type:
  149. VSCloudApi.Instance().slot_refresh_fail(
  150. apt_type.routing_key,
  151. error=str(e)
  152. )
  153. if not batch_booking_triggered:
  154. interval = 30
  155. mode = task.qw_cfg.mode
  156. if mode == QueryWaitMode.Loop:
  157. interval = 0
  158. elif mode == QueryWaitMode.Fixed:
  159. interval = task.qw_cfg.fixed_wait
  160. elif mode == QueryWaitMode.Random:
  161. interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
  162. task.next_run = time.time() + interval
  163. if batch_booking_triggered:
  164. self._log(f"Batch booking finished. Resetting wait times.")
  165. now_ts = time.time()
  166. with self.m_lock:
  167. for t in self.m_tasks:
  168. interval = 30
  169. mode = t.qw_cfg.mode
  170. if mode == QueryWaitMode.Loop:
  171. interval = 1 # 稍微给点缓冲
  172. elif mode == QueryWaitMode.Fixed:
  173. interval = t.qw_cfg.fixed_wait
  174. elif mode == QueryWaitMode.Random:
  175. interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
  176. t.next_run = now_ts + interval
  177. self._log(f"All workers cooldown reset. Resuming monitor loop.")
  178. with self.m_lock:
  179. initial_size = len(self.m_tasks)
  180. self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
  181. if len(self.m_tasks) < initial_size:
  182. self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
  183. except Exception as outer_e:
  184. traceback.print_exc()
  185. self._log(f"🔥 CRITICAL ERROR in monitor loop (recovered): {outer_e}")
  186. # 遇到严重错误休眠 1 秒,防止死循环刷屏占满 CPU
  187. time.sleep(1.0)
  188. self._log("[STOP] monitor loop exiting...")
  189. def _creator_loop(self):
  190. """
  191. @brief 创建者循环:双轨制并发控制 (精简高可用版)
  192. """
  193. self._log("[START] creator loop starting...")
  194. while not self.m_stop_event.is_set():
  195. try:
  196. time.sleep(1.0)
  197. current_builtin = 0
  198. current_orders = 0
  199. with self.m_lock:
  200. for t in self.m_tasks:
  201. if t.task_ref: current_orders += 1
  202. else: current_builtin += 1
  203. pending_builtin = self.m_pending_builtin
  204. pending_orders = self.m_pending_orders
  205. needed_builtin = 0
  206. if self.m_cfg.target_instances > 0:
  207. needed_builtin = self.m_cfg.target_instances - (current_builtin + pending_builtin)
  208. needed_order = 0
  209. if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
  210. needed_order = self.m_cfg.order_account_online_limit - (current_orders + pending_orders)
  211. if needed_builtin <= 0 and needed_order <= 0:
  212. continue
  213. config_data = self._prepare_next_config(
  214. need_builtin=(needed_builtin > 0),
  215. need_order=(needed_order > 0),
  216. )
  217. if not config_data:
  218. time.sleep(5.0)
  219. continue
  220. plg_cfg, task_ref = config_data
  221. with self.m_lock:
  222. if task_ref: self.m_pending_orders += 1
  223. else: self.m_pending_builtin += 1
  224. try:
  225. ThreadPool.getInstance().enqueue(
  226. self._create_and_register_plg_worker, plg_cfg, task_ref
  227. )
  228. self._log(f"+++ Spawning {'Order' if task_ref else 'Builtin'}...")
  229. except Exception as e:
  230. self._log(f"Enqueue failed, rolling back: {e}")
  231. with self.m_lock:
  232. if task_ref: self.m_pending_orders -= 1
  233. else: self.m_pending_builtin -= 1
  234. time.sleep(random.uniform(0.5, 1.0))
  235. except Exception as outer_e:
  236. traceback.print_exc()
  237. self._log(f"🔥 Creator loop error: {outer_e}")
  238. time.sleep(5.0)
  239. self._log("[STOP] creator loop exiting...")
  240. def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]:
  241. """
  242. @brief 准备下一个插件实例的配置
  243. """
  244. plg_cfg = VSPlgConfig()
  245. plg_cfg.debug = self.m_cfg.debug
  246. plg_cfg.free_config = self.m_cfg.free_config
  247. plg_cfg.session_max_life = self.m_cfg.session_max_life
  248. task_ref = None
  249. config_ready = False
  250. pool_name = self.m_cfg.local_account_pool
  251. # =================================================================
  252. # 1. 账号获取
  253. # =================================================================
  254. if not self.m_cfg.need_account:
  255. # === 游客模式 (无需账号) ===
  256. if need_builtin:
  257. plg_cfg.account.id = 0
  258. plg_cfg.account.username = "Guest"
  259. config_ready = True
  260. task_ref = None
  261. else:
  262. # === 标准模式 (需要账号) ===
  263. # A. 优先补充内置账号 (只要 target_instances 还有缺口)
  264. if need_builtin:
  265. try:
  266. # 获取并锁定账号
  267. account = VSCloudApi.Instance().get_next_account(
  268. pool_name,
  269. lock_duration=self.m_cfg.account_login_interval * 60
  270. )
  271. except Exception as e:
  272. self._log(f"Get built-in account failed: {e}")
  273. account = None
  274. if account:
  275. plg_cfg.account.id = account["id"]
  276. plg_cfg.account.username = account["username"]
  277. plg_cfg.account.password = account["password"]
  278. plg_cfg.account.lock_until = account.get("lock_until", 0)
  279. config_ready = True
  280. task_ref = None
  281. self._log(f"Selected Built-in: {plg_cfg.account.username}")
  282. else:
  283. self._log("No available built-in account")
  284. # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
  285. # 只有当 limit > 0 时才尝试
  286. if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0:
  287. try:
  288. routing_key = self.m_cfg.order_account_routing
  289. task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key)
  290. if task_ref:
  291. user_inputs = task_ref.get('user_inputs', {})
  292. plg_cfg.account.id = 0 # 临时账号
  293. plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "")
  294. plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "")
  295. if plg_cfg.account.username:
  296. config_ready = True
  297. self._log(f"Selected Order Acc: {plg_cfg.account.username}")
  298. else:
  299. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  300. return None
  301. except Exception as e:
  302. pass
  303. #self._log(f"Get Order task exception, e={e}")
  304. if not config_ready:
  305. return None
  306. # =================================================================
  307. # 2. 代理配置
  308. # =================================================================
  309. if self.m_cfg.need_proxy:
  310. # 轮询代理
  311. proxy_lock_time = self.m_cfg.proxy_lock_interval
  312. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time)
  313. if not proxy:
  314. try:
  315. if task_ref:
  316. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  317. return None
  318. except Exception as e:
  319. self._log(f"Return Order task to queue exception, e={e}")
  320. plg_cfg.proxy.id = proxy["id"]
  321. plg_cfg.proxy.ip = proxy["ip"]
  322. plg_cfg.proxy.port = proxy["port"]
  323. plg_cfg.proxy.scheme = proxy["scheme"]
  324. plg_cfg.proxy.username = proxy.get("username", "")
  325. plg_cfg.proxy.password = proxy.get("password", "")
  326. plg_cfg.proxy.lock_until = proxy.get("lock_until", 0)
  327. return plg_cfg, task_ref
  328. def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None):
  329. """
  330. @brief 异步创建工作线程
  331. """
  332. instance = None
  333. creation_success = False
  334. try:
  335. # 1. 耗时操作:实例化 & 登录
  336. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  337. instance.set_log(self.m_logger)
  338. instance.set_config(plg_cfg)
  339. instance.create_session() # 可能耗时很久
  340. # 2. 注册到任务列表
  341. with self.m_lock:
  342. # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑
  343. # 计算预定权限
  344. book_allowed = False
  345. if not self.m_cfg.account_bind_applicant:
  346. book_allowed = True
  347. elif self.m_cfg.account_bind_applicant and task_ref:
  348. book_allowed = True
  349. new_task = Task(
  350. instance=instance,
  351. qw_cfg=self.m_cfg.query_wait,
  352. next_run=time.time(),
  353. task_ref=task_ref,
  354. book_allowed=book_allowed
  355. )
  356. self.m_tasks.append(new_task)
  357. creation_success = True
  358. p_type = "Order" if task_ref else "Built-in"
  359. self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===")
  360. except Exception as e:
  361. self._log(f"Creation failed for {plg_cfg.account.username}: {e}")
  362. finally:
  363. # -------------------------------------------------------------
  364. # 3. [绝对核心] 必须扣减 Pending
  365. # -------------------------------------------------------------
  366. # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放
  367. with self.m_lock:
  368. if task_ref:
  369. self.m_pending_orders -= 1
  370. if self.m_pending_orders < 0: self.m_pending_orders = 0
  371. else:
  372. self.m_pending_builtin -= 1
  373. if self.m_pending_builtin < 0: self.m_pending_builtin = 0
  374. # 4. 失败回滚
  375. if not creation_success:
  376. # 如果是云端订单,必须归还,否则丢单
  377. if task_ref:
  378. self._log(f"Rolling back task {task_ref['id']}")
  379. try:
  380. VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id'])
  381. except:
  382. pass
  383. def _on_query_result(self, t: Task, query_result: VSQueryResult):
  384. self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
  385. if not t.book_allowed:
  386. return
  387. # -------------------------------------------------------
  388. # 1. 准备任务数据 (Data Preparation)
  389. # -------------------------------------------------------
  390. task_data = t.task_ref
  391. is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚)
  392. # 如果没有绑定本地任务,尝试从云端 Pop
  393. if not task_data:
  394. apt_type = query_result.apt_type
  395. booking_routing_key = f'auto.{apt_type.routing_key}' if apt_type.routing_key else "default"
  396. task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
  397. if not task_data:
  398. # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃
  399. self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
  400. return
  401. is_cloud_task = True
  402. self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...")
  403. # 统一提取核心参数
  404. task_id = task_data['id']
  405. order_id = task_data.get('order_id')
  406. user_input = task_data.get('user_inputs', {})
  407. # -------------------------------------------------------
  408. # 2. 执行预订 (Execution)
  409. # -------------------------------------------------------
  410. booking_success = False
  411. try:
  412. # 统一调用,无需区分来源
  413. book_res = t.instance.book(query_result, user_input)
  414. # -------------------------------------------------------
  415. # 3. 成功处理 (Success Handling)
  416. # -------------------------------------------------------
  417. if book_res.success:
  418. booking_success = True
  419. self._log(f"✅ Booking SUCCESS! Order: {order_id}")
  420. current_grab_info = {
  421. "account": book_res.account,
  422. "session_id": book_res.session_id,
  423. "urn": book_res.urn,
  424. "slot_date": book_res.book_date,
  425. "slot_time": book_res.book_time,
  426. "timestamp": int(time.time()),
  427. "payment_link": book_res.payment_link,
  428. }
  429. update_data = {
  430. "status": "grabbed",
  431. "grabbed_history": current_grab_info
  432. }
  433. VSCloudApi.Instance().update_vas_task(task_id, update_data)
  434. self._log(f"Task {task_id} marked as GRABBED.")
  435. else:
  436. self._log(f"❌ Booking Failed for Order {order_id}: {book_res.message}")
  437. except Exception as e:
  438. self._log(f"Exception during booking for Order {order_id}: {e}")
  439. finally:
  440. # -------------------------------------------------------
  441. # 4. 回滚机制 (Rollback / Return to Queue)
  442. # -------------------------------------------------------
  443. # 只有两个条件同时满足才回滚:
  444. # 1. 任务来自云端队列 (is_cloud_task is True)
  445. # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常
  446. if is_cloud_task and not booking_success:
  447. self._log(f"Returning Task {task_id} to queue (status=pending).")
  448. try:
  449. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  450. except Exception as ex:
  451. self._log(f"Failed to return task to queue: {ex}")