# gco.py import os import time import json import random import threading from datetime import datetime, timezone from typing import List, Dict, Tuple, Any, Optional, Callable from concurrent.futures import wait # 导入所有依赖 from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task from vs_plg_factory import VSPlgFactory from toolkit.proxy_manager import ProxyManager from toolkit.thread_pool import ThreadPool from toolkit.vs_cloud_api import VSCloudApi import traceback class GCO: """ @brief GCO 类 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、 任务调度、查询和预订流程。 """ def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None): self.m_cfg = cfg self.m_factory = VSPlgFactory() # 插件工厂实例 self.m_logger = logger self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例 self.m_stop_event = threading.Event() # 用于停止线程 self.m_lock = threading.RLock() # 保护共享资源 (m_tasks) self.m_monitor_thread: Optional[threading.Thread] = None self.m_creator_thread: Optional[threading.Thread] = None self.m_pending_builtin = 0 # 正在创建中的内置账号数量 self.m_pending_orders = 0 # 正在创建中的订单账号数量 def start(self): """ @brief 启动协调器,包括插件注册和线程启动。 """ if not self.m_cfg.enable: self._log("Group is disabled, not starting.") return self._log("Starting coordinator...") self.m_stop_event.clear() # 注册插件 plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}") # === 修复点:更智能的类名推导逻辑 === # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) # 调试日志:确认推导出的类名 self._log(f"Inferring class name for plugin {plugin_name}: {class_name}") self.m_factory.register_plugin(plugin_name, plugin_module_path, class_name) self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}") self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}") self.m_monitor_thread.start() self.m_creator_thread.start() self._log("Coordinator threads started.") def stop(self): """ @brief 停止协调器,等待所有线程结束。 """ self._log("Stopping coordinator...") self.m_stop_event.set() # 发送停止信号 if self.m_monitor_thread and self.m_monitor_thread.is_alive(): self.m_monitor_thread.join() if self.m_creator_thread and self.m_creator_thread.is_alive(): self.m_creator_thread.join() self._log("Coordinator stopped.") def group_id(self) -> str: """ @brief 获取分组ID。 """ return self.m_cfg.identifier def _log(self, message): if self.m_logger: self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}') def _monitor_loop(self): """ @brief 监控循环:定期检查实例健康状况,执行查询任务。 一旦发现号源,立即触发所有实例进行批量预订。 """ self._log("[START] monitor loop starting...") rng = random.Random() while not self.m_stop_event.is_set(): try: sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0 time.sleep(sleep_ms) now = time.time() tasks_to_process = [] with self.m_lock: tasks_to_process = list(self.m_tasks) batch_booking_triggered = False for task in tasks_to_process: with self.m_lock: if task not in self.m_tasks: continue if not task.instance.health_check(): continue if now < task.next_run: continue apt_type = None try: apt_types = self.m_cfg.appointment_types if not apt_types: self._log(f"No matching appointment configuration found.") continue weights = [float(item.weight) for item in apt_types] apt_type = random.choices(apt_types, weights=weights, k=1)[0] VSCloudApi.Instance().slot_refresh_start( apt_type.routing_key, country=apt_type.country, city=apt_type.city, visa_type=apt_type.visa_type ) result = task.instance.query(apt_type) result.apt_type = apt_type VSCloudApi.Instance().slot_refresh_success( apt_type.routing_key ) if result.success: self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING.") query_payload = result.to_snapshot_payload() query_payload["website"] = self.m_cfg.website query_payload["snapshot_source"] = 'worker' query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat() VSCloudApi.Instance().slot_snapshot_report(query_payload) futures = [] for worker in tasks_to_process: f = ThreadPool.getInstance().enqueue( self._on_query_result, worker, result ) futures.append(f) for f in futures: try: f.result() except Exception as e: pass batch_booking_triggered = True break else: self._log(f"Query done by {task.instance.get_group_id()}, No availability") except Exception as e: traceback.print_exc() self._log(f"Exception during query: {e}") if apt_type: VSCloudApi.Instance().slot_refresh_fail( apt_type.routing_key, error=str(e) ) if not batch_booking_triggered: interval = 30 mode = task.qw_cfg.mode if mode == QueryWaitMode.Loop: interval = 0 elif mode == QueryWaitMode.Fixed: interval = task.qw_cfg.fixed_wait elif mode == QueryWaitMode.Random: interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max) task.next_run = time.time() + interval if batch_booking_triggered: self._log(f"Batch booking finished. Resetting wait times.") now_ts = time.time() with self.m_lock: for t in self.m_tasks: interval = 30 mode = t.qw_cfg.mode if mode == QueryWaitMode.Loop: interval = 1 # 稍微给点缓冲 elif mode == QueryWaitMode.Fixed: interval = t.qw_cfg.fixed_wait elif mode == QueryWaitMode.Random: interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max) t.next_run = now_ts + interval self._log(f"All workers cooldown reset. Resuming monitor loop.") with self.m_lock: initial_size = len(self.m_tasks) self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()] if len(self.m_tasks) < initial_size: self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}") except Exception as outer_e: traceback.print_exc() self._log(f"🔥 CRITICAL ERROR in monitor loop (recovered): {outer_e}") # 遇到严重错误休眠 1 秒,防止死循环刷屏占满 CPU time.sleep(1.0) self._log("[STOP] monitor loop exiting...") def _creator_loop(self): """ @brief 创建者循环:双轨制并发控制 (精简高可用版) """ self._log("[START] creator loop starting...") while not self.m_stop_event.is_set(): try: time.sleep(1.0) current_builtin = 0 current_orders = 0 with self.m_lock: for t in self.m_tasks: if t.task_ref: current_orders += 1 else: current_builtin += 1 pending_builtin = self.m_pending_builtin pending_orders = self.m_pending_orders needed_builtin = 0 if self.m_cfg.target_instances > 0: needed_builtin = self.m_cfg.target_instances - (current_builtin + pending_builtin) needed_order = 0 if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0: needed_order = self.m_cfg.order_account_online_limit - (current_orders + pending_orders) if needed_builtin <= 0 and needed_order <= 0: continue config_data = self._prepare_next_config( need_builtin=(needed_builtin > 0), need_order=(needed_order > 0), ) if not config_data: time.sleep(5.0) continue plg_cfg, task_ref = config_data with self.m_lock: if task_ref: self.m_pending_orders += 1 else: self.m_pending_builtin += 1 try: ThreadPool.getInstance().enqueue( self._create_and_register_plg_worker, plg_cfg, task_ref ) self._log(f"+++ Spawning {'Order' if task_ref else 'Builtin'}...") except Exception as e: self._log(f"Enqueue failed, rolling back: {e}") with self.m_lock: if task_ref: self.m_pending_orders -= 1 else: self.m_pending_builtin -= 1 time.sleep(random.uniform(0.5, 1.0)) except Exception as outer_e: traceback.print_exc() self._log(f"🔥 Creator loop error: {outer_e}") time.sleep(5.0) self._log("[STOP] creator loop exiting...") def _prepare_next_config(self, need_builtin: bool, need_order: bool) -> Optional[Tuple[VSPlgConfig, Optional[Dict[str, Any]]]]: """ @brief 准备下一个插件实例的配置 """ plg_cfg = VSPlgConfig() plg_cfg.debug = self.m_cfg.debug plg_cfg.free_config = self.m_cfg.free_config plg_cfg.session_max_life = self.m_cfg.session_max_life task_ref = None config_ready = False pool_name = self.m_cfg.local_account_pool # ================================================================= # 1. 账号获取 # ================================================================= if not self.m_cfg.need_account: # === 游客模式 (无需账号) === if need_builtin: plg_cfg.account.id = 0 plg_cfg.account.username = "Guest" config_ready = True task_ref = None else: # === 标准模式 (需要账号) === # A. 优先补充内置账号 (只要 target_instances 还有缺口) if need_builtin: try: # 获取并锁定账号 account = VSCloudApi.Instance().get_next_account( pool_name, lock_duration=self.m_cfg.account_login_interval * 60 ) except Exception as e: # self._log(f"Get built-in account failed: {e}") account = None if account: plg_cfg.account.id = account["id"] plg_cfg.account.username = account["username"] plg_cfg.account.password = account["password"] plg_cfg.account.lock_until = account.get("lock_until", 0) config_ready = True task_ref = None self._log(f"Selected Built-in: {plg_cfg.account.username}") # B. 次选补充订单账号 (如果内置不需要 或 池子空了) # 只有当 limit > 0 时才尝试 if not config_ready and need_order and self.m_cfg.order_account_online_limit > 0: try: routing_key = self.m_cfg.order_account_routing task_ref = VSCloudApi.Instance().get_vas_task_pop(routing_key) if task_ref: user_inputs = task_ref.get('user_inputs', {}) plg_cfg.account.id = 0 # 临时账号 plg_cfg.account.username = user_inputs.get(self.m_cfg.input_map_username, "") plg_cfg.account.password = user_inputs.get(self.m_cfg.input_map_password, "") if plg_cfg.account.username: config_ready = True self._log(f"Selected Order Acc: {plg_cfg.account.username}") else: VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id']) return None except Exception as e: pass #self._log(f"Get Order task exception, e={e}") if not config_ready: return None # ================================================================= # 2. 代理配置 # ================================================================= if self.m_cfg.need_proxy: # 轮询代理 proxy_lock_time = self.m_cfg.proxy_lock_interval proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=proxy_lock_time) if not proxy: try: if task_ref: VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id']) return None except Exception as e: self._log(f"Return Order task to queue exception, e={e}") plg_cfg.proxy.id = proxy["id"] plg_cfg.proxy.ip = proxy["ip"] plg_cfg.proxy.port = proxy["port"] plg_cfg.proxy.scheme = proxy["scheme"] plg_cfg.proxy.username = proxy.get("username", "") plg_cfg.proxy.password = proxy.get("password", "") plg_cfg.proxy.lock_until = proxy.get("lock_until", 0) return plg_cfg, task_ref def _create_and_register_plg_worker(self, plg_cfg: VSPlgConfig, task_ref: Optional[Dict[str, Any]] = None): """ @brief 异步创建工作线程 """ instance = None creation_success = False try: # 1. 耗时操作:实例化 & 登录 instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) instance.set_log(self.m_logger) instance.set_config(plg_cfg) instance.create_session() # 可能耗时很久 # 2. 注册到任务列表 with self.m_lock: # 既然允许短暂超发,这里直接添加,不做严格的拒绝并返回逻辑 # 计算预定权限 book_allowed = False if not self.m_cfg.account_bind_applicant: book_allowed = True elif self.m_cfg.account_bind_applicant and task_ref: book_allowed = True new_task = Task( instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time(), task_ref=task_ref, book_allowed=book_allowed ) self.m_tasks.append(new_task) creation_success = True p_type = "Order" if task_ref else "Built-in" self._log(f"=== Instance Registered [{p_type}]. Total Active: {len(self.m_tasks)} ===") except Exception as e: self._log(f"Creation failed for {plg_cfg.account.username}: {e}") finally: # ------------------------------------------------------------- # 3. [绝对核心] 必须扣减 Pending # ------------------------------------------------------------- # 无论上面是否抛出异常,或者是否注册成功,Pending 必须释放 with self.m_lock: if task_ref: self.m_pending_orders -= 1 if self.m_pending_orders < 0: self.m_pending_orders = 0 else: self.m_pending_builtin -= 1 if self.m_pending_builtin < 0: self.m_pending_builtin = 0 # 4. 失败回滚 if not creation_success: # 如果是云端订单,必须归还,否则丢单 if task_ref: self._log(f"Rolling back task {task_ref['id']}") try: VSCloudApi.Instance().return_vas_task_to_queue(task_ref['id']) except: pass def _on_query_result(self, t: Task, query_result: VSQueryResult): self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...") if not t.book_allowed: return # ------------------------------------------------------- # 1. 准备任务数据 (Data Preparation) # ------------------------------------------------------- task_data = t.task_ref is_cloud_task = False # 标记:是否为云端临时取出的任务(需要回滚) # 如果没有绑定本地任务,尝试从云端 Pop if not task_data: apt_type = query_result.apt_type booking_routing_key = f'auto.{apt_type.routing_key}' if apt_type.routing_key else "default" task_data = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key) if not task_data: # 这种情况属于:内置账号查到了号,但云端没有待处理订单,直接放弃 self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.") return is_cloud_task = True self._log(f"Picked up Cloud Task ID {task_data['id']} for booking...") # 统一提取核心参数 task_id = task_data['id'] order_id = task_data.get('order_id') user_input = task_data.get('user_inputs', {}) # ------------------------------------------------------- # 2. 执行预订 (Execution) # ------------------------------------------------------- booking_success = False try: # 统一调用,无需区分来源 book_res = t.instance.book(query_result, user_input) # ------------------------------------------------------- # 3. 成功处理 (Success Handling) # ------------------------------------------------------- if book_res.success: booking_success = True self._log(f"✅ Booking SUCCESS! Order: {order_id}") current_grab_info = { "account": book_res.account, "session_id": book_res.session_id, "urn": book_res.urn, "slot_date": book_res.book_date, "slot_time": book_res.book_time, "timestamp": int(time.time()), "payment_link": book_res.payment_link, } update_data = { "status": "grabbed", "grabbed_history": current_grab_info } VSCloudApi.Instance().update_vas_task(task_id, update_data) self._log(f"Task {task_id} marked as GRABBED.") else: self._log(f"❌ Booking Failed for Order {order_id}") except Exception as e: self._log(f"Exception during booking for Order {order_id}: {e}") finally: # ------------------------------------------------------- # 4. 回滚机制 (Rollback / Return to Queue) # ------------------------------------------------------- # 只有两个条件同时满足才回滚: # 1. 任务来自云端队列 (is_cloud_task is True) # 2. 预定没有成功 (booking_success is False) - 包括预定失败或发生异常 if is_cloud_task and not booking_success: self._log(f"Returning Task {task_id} to queue (status=pending).") try: VSCloudApi.Instance().return_vas_task_to_queue(task_id) except Exception as ex: self._log(f"Failed to return task to queue: {ex}")