# group_coordinator.py import os import time import json import random import threading from typing import List, Optional from concurrent.futures import wait # 导入所有依赖 from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task from vs_plg import IVSPlg from vs_plg_factory import VSPlgFactory from toolkit.account_manager import AccountManager from toolkit.proxy_manager import ProxyManager from toolkit.binding_manager import BindingManager from toolkit.thread_pool import ThreadPool from toolkit.vs_cloud_api import VSCloudApi from vs_log_macros import VSC_INFO, VSC_DEBUG, VSC_WARN, VSC_ERROR class GroupCoordinator: """ @brief GroupCoordinator 类 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、 任务调度、查询和预订流程。 """ def __init__(self, cfg: GroupConfig): self.m_cfg = cfg self.m_factory = VSPlgFactory() # 插件工厂实例 self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例 self.m_stop_event = threading.Event() # 用于停止线程 self.m_lock = threading.RLock() # 保护共享资源 (m_tasks) self.m_monitor_thread: Optional[threading.Thread] = None self.m_creator_thread: Optional[threading.Thread] = None # 预订操作的线程池,独立于任务调度 self.book_executor = ThreadPool(max_workers=5).getInstance() # 使用我们封装的ThreadPool VSC_INFO("coordinator", f"GroupCoordinator for {self.m_cfg.identifier} initialized.") def set_push_callback(self, cb): """ @brief 设置推送回调函数 (C++中的PushCallback) Python中可以直接传递可调用对象。 """ self.push_callback_ = cb VSC_INFO("coordinator", f"Push callback set for group {self.m_cfg.identifier}.") def start(self): """ @brief 启动协调器,包括插件注册和线程启动。 """ if not self.m_cfg.enable: VSC_WARN("coordinator", f"Group {self.m_cfg.identifier} is disabled, not starting.") return VSC_INFO("coordinator", f"Starting coordinator for group {self.m_cfg.identifier}...") self.m_stop_event.clear() # 注册插件 plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}") # === 修复点:更智能的类名推导逻辑 === # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) # 调试日志:确认推导出的类名 VSC_DEBUG("coordinator", f"Inferring class name for plugin {plugin_name}: {class_name}") self.m_factory.register_plugin(plugin_name, plugin_module_path, class_name) self.m_monitor_thread = threading.Thread(target=self.monitor_loop, name=f"Monitor-{self.m_cfg.identifier}") self.m_creator_thread = threading.Thread(target=self.creator_loop, name=f"Creator-{self.m_cfg.identifier}") self.m_monitor_thread.start() self.m_creator_thread.start() VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} threads started.") def stop(self): """ @brief 停止协调器,等待所有线程结束。 """ VSC_INFO("coordinator", f"Stopping coordinator for group {self.m_cfg.identifier}...") self.m_stop_event.set() # 发送停止信号 if self.m_monitor_thread and self.m_monitor_thread.is_alive(): self.m_monitor_thread.join() if self.m_creator_thread and self.m_creator_thread.is_alive(): self.m_creator_thread.join() # 关闭预订线程池 self.book_executor.shutdown(wait=True) VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} stopped.") def restart(self): """ @brief 重启协调器。 """ VSC_INFO("coordinator", f"Restarting coordinator for group {self.m_cfg.identifier}...", ) self.stop() self.start() VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} restarted.") def group_id(self) -> str: """ @brief 获取分组ID。 """ return self.m_cfg.identifier def monitor_loop(self): """ @brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。 """ VSC_INFO("coordinator", f"[START] monitor loop starting for group {self.m_cfg.identifier}") rng = random.Random() while not self.m_stop_event.is_set(): sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0 time.sleep(sleep_ms) now = time.time() # 拷贝任务列表 tasks_to_process = [] with self.m_lock: tasks_to_process = list(self.m_tasks) for task in tasks_to_process: with self.m_lock: if task not in self.m_tasks: continue if not task.instance.health_check(): continue if now < task.next_run: continue # 执行查询 is_booking_triggered = False try: result = task.instance.query() if result.success: # === 关键修改:on_query_result 现在会阻塞直到抢票结束 === self.on_query_result(task.instance, result) is_booking_triggered = True else: VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Query failed, No availability found") except Exception as e: VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Exception during query: {e}") # 计算下次运行时间 # 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬 if is_booking_triggered: interval = rng.randint(30, 60) # 抢完票休息 30-60 秒 VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Booking attempted, entering cooldown for {interval} sec.") else: interval = 30 mode = task.qw_cfg.mode if mode == QueryWaitMode.Loop: interval = 0 elif mode == QueryWaitMode.Fixed: interval = task.qw_cfg.fixed_wait elif mode == QueryWaitMode.Random: interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max) task.next_run = time.time() + interval # 清理不健康实例 with self.m_lock: initial_size = len(self.m_tasks) self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()] if len(self.m_tasks) < initial_size: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}") VSC_INFO("coordinator", f"[STOP] monitor loop exiting for group {self.m_cfg.identifier}") def creator_loop(self): """ @brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。 """ VSC_INFO("coordinator", f"[START] creator loop starting for group {self.m_cfg.identifier}") while not self.m_stop_event.is_set(): time.sleep(0.1) # 避免空转太快 diff = 0 with self.m_lock: current_instances_count = len(self.m_tasks) diff = self.m_cfg.target_instances - current_instances_count if diff > 0: VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Need to create {diff} new instance(s). Current: {current_instances_count}, Target: {self.m_cfg.target_instances}") # 准备配置 plg_cfg = self._make_plg_config() if not plg_cfg: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Failed to prepare plugin configuration, sleeping 30s.") time.sleep(30) # 等待资源 (账户/代理) 恢复 continue # 在线程池中创建实例,模拟C++的异步创建 future = ThreadPool.getInstance().enqueue(self._create_instance, plg_cfg) inst = future.result() # 等待创建完成 if inst: with self.m_lock: # 确保在添加到任务列表之前,实例数量仍然低于目标值 if len(self.m_tasks) < self.m_cfg.target_instances: new_task = Task( instance=inst, qw_cfg=self.m_cfg.query_wait, next_run=time.time() # 立即执行第一次查询 ) self.m_tasks.append(new_task) VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] New instance added. Total instances: {len(self.m_tasks)}") else: VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Target instances already met, discarding newly created instance.") else: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Failed to create plugin instance.") # 可以在这里添加重试逻辑或错误处理 # 模拟创建间隔,避免瞬间创建过多实例 time.sleep(random.uniform(1.0, 5.0)) VSC_INFO("coordinator", f"[STOP] creator loop exiting for group {self.m_cfg.identifier}") def _make_plg_config(self) -> Optional[VSPlgConfig]: """ @brief 准备插件配置 (账号、代理等)。 """ VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Preparing plugin configuration...") plg_cfg = VSPlgConfig() # 账号配置 if self.m_cfg.need_account: account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool) if not account: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available accounts for pool {self.m_cfg.account_pool}") return None plg_cfg.account.id = account["id"] plg_cfg.account.username = account["username"] plg_cfg.account.password = account["password"] plg_cfg.account.lock_until = account.get("lock_until", "") VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Using account ID {plg_cfg.account.id}, username {plg_cfg.account.username}") # 代理配置 if self.m_cfg.need_proxy: proxy = None if self.m_cfg.need_ip_bind: proxy_id = BindingManager.Instance().get_bounded_proxy_id(self.m_cfg.account_pool, plg_cfg.account.id) if proxy_id is None: # 没有绑定代理,需要获取一个新的并绑定 bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool) proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids) if not proxy: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available unbind proxy in pool {self.m_cfg.proxy_pool}") return None BindingManager.Instance().create_binding( self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.proxy_pool, proxy["id"], "dynamic") VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Created dynamic binding: account {plg_cfg.account.id} -> proxy {proxy['id']}") else: all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, []) proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None) if not proxy: VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Bounded proxy ID {proxy_id} not found in pool {self.m_cfg.proxy_pool}") return None else: proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool) if not proxy: VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available proxy in pool {self.m_cfg.proxy_pool}") return None plg_cfg.proxy.id = proxy["id"] plg_cfg.proxy.ip = proxy["ip"] plg_cfg.proxy.port = proxy["port"] plg_cfg.proxy.scheme = proxy["scheme"] plg_cfg.proxy.username = proxy.get("username", "") plg_cfg.proxy.password = proxy.get("password", "") plg_cfg.proxy.lock_until = proxy.get("lock_until", "") VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Using proxy ID {plg_cfg.proxy.id}, IP {plg_cfg.proxy.ip}:{plg_cfg.proxy.port}") plg_cfg.free_config = self.m_cfg.free_config VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Plugin configuration prepared.") return plg_cfg def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]: # """ # @brief 创建并初始化单个插件实例。 # 这个方法在 creator_loop 的线程池中执行。 # """ # VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Creating plugin instance (plugin={self.m_cfg.plugin_config.plugin_name})...") # try: inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) inst.set_config(plg_cfg) inst.create_session() if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0: AccountManager.Instance().lock_account( self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60) VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Plugin instance created and session established.") return inst # except Exception as e: # VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Error creating plugin instance: {e}") # return None def on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult): VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Query result received: {str(query_result)}. BLOCKING monitor loop for booking...") # 定义内部预订任务 def book_task(inst: IVSPlg, result: VSQueryResult): task_id = None try: # 1. 获取对应的用户任务 (Pop Task) booking_routing_key = f'auto.{result.routing_key}' if result.routing_key else "default" # 尝试获取任务 task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key) if not task: VSC_WARN("coordinator", f"[{inst.get_group_id()}] No pending task found for key {booking_routing_key}. Abandoning slot.") return task_id = task['id'] order_id = task['order_id'] user_input = task.get('user_inputs', {}) VSC_INFO("coordinator", f"[{inst.get_group_id()}] Picked up Task ID {task_id} for booking...") # 2. 执行预订 # 注意:插件的 book 方法需要接收 user_input book_res = inst.book(result, user_input) # 3. 处理结果 if book_res.success: VSC_INFO("coordinator", f"[{inst.get_group_id()}] Booking SUCCESS! Order: {order_id}") # 推送通知 if hasattr(self, 'push_callback_') and self.push_callback_: self.push_callback_(100, f"Booking Success: {order_id}".encode('utf-8'), 0) # 4. 成功逻辑:更新任务状态为 grabbed # 包含后端需要的关键信息 current_grab_info = { "account": book_res.account, "session_id": book_res.session_id, "slot_date": book_res.book_date, "slot_time": book_res.book_time, "timestamp": int(time.time()), "payment_link": book_res.payment_link, } update_data = { "status": "grabbed", # === 修改点:直接覆盖 === # 直接发送字典对象,requests 会自动序列化为 JSON Object # 满足后端 "type":"dict_type" 的校验 "grabbed_history": current_grab_info } VSCloudApi.Instance().update_vas_task(task_id, update_data) VSC_INFO("coordinator", f"[{inst.get_group_id()}] Task {task_id} marked as GRABBED.") # 成功后 task_id 置空,防止 finally 块再次将其重置为 pending task_id = None except Exception as e: VSC_ERROR("coordinator", f"[{inst.get_group_id()}] Exception during booking: {e.message}") finally: # 5. Return to Queue (回滚机制) if task_id is not None: VSC_WARN("coordinator", f"[{inst.get_group_id()}] Returning Task {task_id} to queue (status=pending).") try: VSCloudApi.Instance().return_vas_task_to_queue(task_id) except Exception as ex: VSC_ERROR("coordinator", f"[{inst.get_group_id()}] Failed to return task to queue: {ex}") futures = [] f = self.book_executor.enqueue(book_task, sptr, query_result) futures.append(f) wait(futures)