# group_coordinator.py import os import time import json import random import threading from typing import List, Optional from concurrent.futures import ThreadPoolExecutor, wait # 导入所有依赖 from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task # type: ignore from vs_plg import IVSPlg # type: ignore from vs_plg_factory import VSPlgFactory # type: ignore from toolkit.account_manager import AccountManager # type: ignore from toolkit.proxy_manager import ProxyManager # type: ignore from toolkit.binding_manager import BindingManager # type: ignore from toolkit.thread_pool import ThreadPool # type: ignore from toolkit.vs_cloud_api import VSCloudApi from vs_log_macros import VSC_INFO, VSC_DEBUG, VSC_WARN, VSC_ERROR # type: ignore class GroupCoordinator: """ @brief GroupCoordinator 类 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、 任务调度、查询和预订流程。 """ def __init__(self, cfg: GroupConfig): self.m_cfg = cfg self.m_factory = VSPlgFactory() # 插件工厂实例 self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例 self.m_stop_event = threading.Event() # 用于停止线程 self.m_lock = threading.RLock() # 保护共享资源 (m_tasks) self.m_monitor_thread: Optional[threading.Thread] = None self.m_creator_thread: Optional[threading.Thread] = None # 预订操作的线程池,独立于任务调度 self.book_executor = ThreadPool(max_workers=5).getInstance() # 使用我们封装的ThreadPool VSC_INFO("coordinator", "GroupCoordinator for '%s' initialized.", self.m_cfg.identifier) def set_push_callback(self, cb): """ @brief 设置推送回调函数 (C++中的PushCallback) Python中可以直接传递可调用对象。 """ self.push_callback_ = cb VSC_INFO("coordinator", "Push callback set for group '%s'.", self.m_cfg.identifier) def start(self): """ @brief 启动协调器,包括插件注册和线程启动。 """ if not self.m_cfg.enable: VSC_WARN("coordinator", "Group '%s' is disabled, not starting.", self.m_cfg.identifier) return VSC_INFO("coordinator", "Starting coordinator for group '%s'...", self.m_cfg.identifier) self.m_stop_event.clear() # 注册插件 plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}") # === 修复点:更智能的类名推导逻辑 === # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) # 调试日志:确认推导出的类名 VSC_DEBUG("coordinator", "Inferring class name for plugin '%s': '%s'", plugin_name, class_name) self.m_factory.register_plugin(plugin_name, plugin_module_path, class_name) self.m_monitor_thread = threading.Thread(target=self.monitor_loop, name=f"Monitor-{self.m_cfg.identifier}") self.m_creator_thread = threading.Thread(target=self.creator_loop, name=f"Creator-{self.m_cfg.identifier}") self.m_monitor_thread.start() self.m_creator_thread.start() VSC_INFO("coordinator", "Coordinator for group '%s' threads started.", self.m_cfg.identifier) def stop(self): """ @brief 停止协调器,等待所有线程结束。 """ VSC_INFO("coordinator", "Stopping coordinator for group '%s'...", self.m_cfg.identifier) self.m_stop_event.set() # 发送停止信号 if self.m_monitor_thread and self.m_monitor_thread.is_alive(): self.m_monitor_thread.join() if self.m_creator_thread and self.m_creator_thread.is_alive(): self.m_creator_thread.join() # 关闭预订线程池 self.book_executor.shutdown(wait=True) VSC_INFO("coordinator", "Coordinator for group '%s' stopped.", self.m_cfg.identifier) def restart(self): """ @brief 重启协调器。 """ VSC_INFO("coordinator", "Restarting coordinator for group '%s'...", self.m_cfg.identifier) self.stop() self.start() VSC_INFO("coordinator", "Coordinator for group '%s' restarted.", self.m_cfg.identifier) def group_id(self) -> str: """ @brief 获取分组ID。 """ return self.m_cfg.identifier def monitor_loop(self): """ @brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。 """ VSC_INFO("coordinator", "[START] monitor loop starting for group %s", self.m_cfg.identifier) rng = random.Random() while not self.m_stop_event.is_set(): sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0 time.sleep(sleep_ms) now = time.time() # 拷贝任务列表 tasks_to_process = [] with self.m_lock: tasks_to_process = list(self.m_tasks) for task in tasks_to_process: with self.m_lock: if task not in self.m_tasks: continue if not task.instance.health_check(): continue if now < task.next_run: continue # 执行查询 is_booking_triggered = False try: result = task.instance.query() if result.success: # === 关键修改:on_query_result 现在会阻塞直到抢票结束 === self.on_query_result(task.instance, result) is_booking_triggered = True else: error = task.instance.get_last_error() if error.error_code != 2001: # 忽略 No availability found VSC_DEBUG("coordinator", "[%s] Query failed, code=%d, msg=%s", self.m_cfg.identifier, error.error_code, error.error_message) except Exception as e: VSC_ERROR("coordinator", "[%s] Exception during query: %s", self.m_cfg.identifier, str(e)) # 计算下次运行时间 # 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬 if is_booking_triggered: interval = rng.randint(30, 60) # 抢完票休息 30-60 秒 VSC_INFO("coordinator", "[%s] Booking attempted, entering cooldown for %d sec.", self.m_cfg.identifier, interval) else: interval = 30 mode = task.qw_cfg.mode if mode == QueryWaitMode.Loop: interval = 0 elif mode == QueryWaitMode.Fixed: interval = task.qw_cfg.fixed_wait elif mode == QueryWaitMode.Random: interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max) task.next_run = time.time() + interval # 清理不健康实例 with self.m_lock: initial_size = len(self.m_tasks) self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()] if len(self.m_tasks) < initial_size: VSC_WARN("coordinator", "[%s] Removed %d unhealthy instance(s). Remaining: %d", self.m_cfg.identifier, initial_size - len(self.m_tasks), len(self.m_tasks)) VSC_INFO("coordinator", "[STOP] monitor loop exiting for group %s", self.m_cfg.identifier) def creator_loop(self): """ @brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。 """ VSC_INFO("coordinator", "[START] creator loop starting for group %s", self.m_cfg.identifier) while not self.m_stop_event.is_set(): time.sleep(0.1) # 避免空转太快 diff = 0 with self.m_lock: current_instances_count = len(self.m_tasks) diff = self.m_cfg.target_instances - current_instances_count if diff > 0: VSC_INFO("coordinator", "[%s] Need to create %d new instance(s). Current: %d, Target: %d", self.m_cfg.identifier, diff, current_instances_count, self.m_cfg.target_instances) # 准备配置 plg_cfg = self._make_plg_config() if not plg_cfg: VSC_WARN("coordinator", "[%s] Failed to prepare plugin configuration, sleeping 30s.", self.m_cfg.identifier) time.sleep(30) # 等待资源 (账户/代理) 恢复 continue # 在线程池中创建实例,模拟C++的异步创建 future = ThreadPool.getInstance().enqueue(self._create_instance, plg_cfg) inst = future.result() # 等待创建完成 if inst: with self.m_lock: # 确保在添加到任务列表之前,实例数量仍然低于目标值 if len(self.m_tasks) < self.m_cfg.target_instances: new_task = Task( instance=inst, qw_cfg=self.m_cfg.query_wait, next_run=time.time() # 立即执行第一次查询 ) self.m_tasks.append(new_task) VSC_INFO("coordinator", "[%s] New instance added. Total instances: %d", self.m_cfg.identifier, len(self.m_tasks)) else: VSC_DEBUG("coordinator", "[%s] Target instances already met, discarding newly created instance.", self.m_cfg.identifier) else: VSC_WARN("coordinator", "[%s] Failed to create plugin instance.", self.m_cfg.identifier) # 可以在这里添加重试逻辑或错误处理 # 模拟创建间隔,避免瞬间创建过多实例 time.sleep(random.uniform(1.0, 5.0)) VSC_INFO("coordinator", "[STOP] creator loop exiting for group %s", self.m_cfg.identifier) def _make_plg_config(self) -> Optional[VSPlgConfig]: """ @brief 准备插件配置 (账号、代理等)。 """ VSC_DEBUG("coordinator", "[%s] Preparing plugin configuration...", self.m_cfg.identifier) plg_cfg = VSPlgConfig() # 账号配置 if self.m_cfg.need_account: account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool) if not account: VSC_WARN("coordinator", "[%s] No available accounts for pool '%s'", self.m_cfg.identifier, self.m_cfg.account_pool) return None plg_cfg.account.id = account["id"] plg_cfg.account.username = account["username"] plg_cfg.account.password = account["password"] plg_cfg.account.lock_until = account.get("lock_until", "") VSC_DEBUG("coordinator", "[%s] Using account ID %d, username %s", self.m_cfg.identifier, plg_cfg.account.id, plg_cfg.account.username) # 代理配置 if self.m_cfg.need_proxy: proxy = None if self.m_cfg.need_ip_bind: proxy_id = BindingManager.Instance().get_bounded_proxy_id(self.m_cfg.account_pool, plg_cfg.account.id) if proxy_id is None: # 没有绑定代理,需要获取一个新的并绑定 bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool) proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids) if not proxy: VSC_WARN("coordinator", "[%s] No available unbind proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool) return None BindingManager.Instance().create_binding( self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.proxy_pool, proxy["id"], "dynamic") VSC_INFO("coordinator", "[%s] Created dynamic binding: account %d -> proxy %d", self.m_cfg.identifier, plg_cfg.account.id, proxy["id"]) else: # 已经有绑定代理,直接获取 all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, []) proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None) if not proxy: VSC_ERROR("coordinator", "[%s] Bounded proxy ID %d not found in pool %s", self.m_cfg.identifier, proxy_id, self.m_cfg.proxy_pool) return None else: proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool) if not proxy: VSC_WARN("coordinator", "[%s] No available proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool) return None plg_cfg.proxy.id = proxy["id"] plg_cfg.proxy.ip = proxy["ip"] plg_cfg.proxy.port = proxy["port"] plg_cfg.proxy.scheme = proxy["scheme"] plg_cfg.proxy.username = proxy.get("username", "") plg_cfg.proxy.password = proxy.get("password", "") plg_cfg.proxy.lock_until = proxy.get("lock_until", "") VSC_DEBUG("coordinator", "[%s] Using proxy ID %d, IP %s:%d", self.m_cfg.identifier, plg_cfg.proxy.id, plg_cfg.proxy.ip, plg_cfg.proxy.port) plg_cfg.free_config = self.m_cfg.free_config VSC_DEBUG("coordinator", "[%s] Plugin configuration prepared.", self.m_cfg.identifier) return plg_cfg def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]: """ @brief 创建并初始化单个插件实例。 这个方法在 creator_loop 的线程池中执行。 """ VSC_DEBUG("coordinator", "[%s] Creating plugin instance (plugin=%s)...", self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) try: inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) inst.set_config(plg_cfg) success = inst.create_session() # 无论成功失败都锁定账号 if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0: AccountManager.Instance().lock_account( self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60) if not success: error = inst.get_last_error() VSC_ERROR("coordinator", "[%s] Create session failed, code=%d, msg=%s", self.m_cfg.identifier, error.error_code, error.error_message) return None VSC_INFO("coordinator", "[%s] Plugin instance created and session established.", self.m_cfg.identifier) return inst except Exception as e: VSC_ERROR("coordinator", "[%s] Error creating plugin instance: %s", self.m_cfg.identifier, str(e)) return None def on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult): VSC_INFO("coordinator", "[%s] Query result received: %s. BLOCKING monitor loop for booking...", self.m_cfg.identifier, str(query_result)) # 定义内部预订任务 def book_task(inst: IVSPlg, result: VSQueryResult): task_id = None try: # 1. 获取对应的用户任务 (Pop Task) booking_routing_key = f'auto.{result.routing_key}' if result.routing_key else "default" # 尝试获取任务 task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key) if not task: VSC_WARN("coordinator", "[%s] No pending user tasks found for key '%s'. Abandoning slot.", inst.get_group_id(), booking_routing_key) return task_id = task['id'] user_input = task.get('user_inputs', {}) VSC_INFO("coordinator", "[%s] Picked up Task ID %s for booking...", inst.get_group_id(), task_id) # 2. 执行预订 # 注意:插件的 book 方法需要接收 user_input book_res = inst.book(result, user_input) # 3. 处理结果 if book_res.success: VSC_INFO("coordinator", "[%s] Booking SUCCESS! Order: '%s'", inst.get_group_id(), book_res.order_id) # 推送通知 if hasattr(self, 'push_callback_') and self.push_callback_: self.push_callback_(100, f"Booking Success: {book_res.order_id}".encode('utf-8'), 0) # 4. 成功逻辑:更新任务状态为 grabbed # 构造历史记录详情对象 history_detail = { "slot_date": book_res.book_date, "slot_time": book_res.book_time, "account": inst.config.account.username if inst.config else "unknown", "timestamp": int(time.time()), "payment_link": book_res.payment_link, "order_id": book_res.order_id } update_data = { "status": "grabbed", # 修改点:grabbed_history 是一个字符串列表 # 我们将详情对象序列化为 JSON 字符串放入列表中 "grabbed_history": [json.dumps(history_detail)] } VSCloudApi.Instance().update_vas_task(task_id, update_data) VSC_INFO("coordinator", "[%s] Task %s marked as GRABBED.", inst.get_group_id(), task_id) # 成功后 task_id 置空,防止 finally 块再次将其重置为 pending task_id = None else: # 失败逻辑 error = inst.get_last_error() VSC_ERROR("coordinator", "[%s] Booking FAILED. Code=%d, Msg=%s", inst.get_group_id(), error.error_code, error.error_message) # task_id 仍然存在,将由 finally 块处理回滚 except Exception as e: VSC_ERROR("coordinator", "[%s] Exception during booking: %s", inst.get_group_id(), str(e)) finally: # 5. Return to Queue (回滚机制) if task_id is not None: VSC_WARN("coordinator", "[%s] Returning Task %s to queue (status=pending).", inst.get_group_id(), task_id) try: VSCloudApi.Instance().update_vas_task(task_id, {"status": "pending"}) except Exception as ex: VSC_ERROR("coordinator", "[%s] Failed to return task to queue: %s", inst.get_group_id(), str(ex)) futures = [] f = self.book_executor.enqueue(book_task, sptr, query_result) futures.append(f) wait(futures)