| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- # group_coordinator.py
- import threading
- import time
- import random
- import os
- from typing import List, Optional
- from concurrent.futures import ThreadPoolExecutor, wait
- # 导入所有依赖
- from vs_types import GroupConfig, QueryWaitMode, VSPlgConfig, VSQueryResult, Task # type: ignore
- from vs_plg import IVSPlg # type: ignore
- from vs_plg_factory import VSPlgFactory # type: ignore
- from toolkit.account_manager import AccountManager # type: ignore
- from toolkit.proxy_manager import ProxyManager # type: ignore
- from toolkit.binding_manager import BindingManager # type: ignore
- from toolkit.thread_pool import ThreadPool # type: ignore
- from vs_log_macros import VSC_INFO, VSC_DEBUG, VSC_WARN, VSC_ERROR # type: ignore
- class GroupCoordinator:
- """
- @brief GroupCoordinator 类
- 负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
- 任务调度、查询和预订流程。
- """
- def __init__(self, cfg: GroupConfig):
- self.m_cfg = cfg
- self.m_factory = VSPlgFactory() # 插件工厂实例
-
- self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
-
- self.m_stop_event = threading.Event() # 用于停止线程
- self.m_lock = threading.RLock() # 保护共享资源 (m_tasks)
-
- self.m_monitor_thread: Optional[threading.Thread] = None
- self.m_creator_thread: Optional[threading.Thread] = None
-
- # 预订操作的线程池,独立于任务调度
- self.book_executor = ThreadPool(max_workers=5).getInstance() # 使用我们封装的ThreadPool
- VSC_INFO("coordinator", "GroupCoordinator for '%s' initialized.", self.m_cfg.identifier)
- def set_push_callback(self, cb):
- """
- @brief 设置推送回调函数 (C++中的PushCallback)
- Python中可以直接传递可调用对象。
- """
- self.push_callback_ = cb
- VSC_INFO("coordinator", "Push callback set for group '%s'.", self.m_cfg.identifier)
- def start(self):
- """
- @brief 启动协调器,包括插件注册和线程启动。
- """
- if not self.m_cfg.enable:
- VSC_WARN("coordinator", "Group '%s' is disabled, not starting.", self.m_cfg.identifier)
- return
- VSC_INFO("coordinator", "Starting coordinator for group '%s'...", self.m_cfg.identifier)
- self.m_stop_event.clear()
- # 注册插件
- plugin_module_path = os.path.join(self.m_cfg.plugin_config.lib_path, f"{self.m_cfg.plugin_config.plugin_bin}")
-
- # === 修复点:更智能的类名推导逻辑 ===
- # 将 snake_case (e.g., "concrete_plugin") 转换为 PascalCase (e.g., "ConcretePlugin")
- plugin_name = self.m_cfg.plugin_config.plugin_name
- class_name = "".join(part.title() for part in plugin_name.split('_'))
-
- # 调试日志:确认推导出的类名
- VSC_DEBUG("coordinator", "Inferring class name for plugin '%s': '%s'", plugin_name, class_name)
- self.m_factory.register_plugin(plugin_name,
- plugin_module_path,
- class_name)
- self.m_monitor_thread = threading.Thread(target=self.monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
- self.m_creator_thread = threading.Thread(target=self.creator_loop, name=f"Creator-{self.m_cfg.identifier}")
-
- self.m_monitor_thread.start()
- self.m_creator_thread.start()
- VSC_INFO("coordinator", "Coordinator for group '%s' threads started.", self.m_cfg.identifier)
- def stop(self):
- """
- @brief 停止协调器,等待所有线程结束。
- """
- VSC_INFO("coordinator", "Stopping coordinator for group '%s'...", self.m_cfg.identifier)
- self.m_stop_event.set() # 发送停止信号
-
- if self.m_monitor_thread and self.m_monitor_thread.is_alive():
- self.m_monitor_thread.join()
- if self.m_creator_thread and self.m_creator_thread.is_alive():
- self.m_creator_thread.join()
-
- # 关闭预订线程池
- self.book_executor.shutdown(wait=True)
- VSC_INFO("coordinator", "Coordinator for group '%s' stopped.", self.m_cfg.identifier)
- def restart(self):
- """
- @brief 重启协调器。
- """
- VSC_INFO("coordinator", "Restarting coordinator for group '%s'...", self.m_cfg.identifier)
- self.stop()
- self.start()
- VSC_INFO("coordinator", "Coordinator for group '%s' restarted.", self.m_cfg.identifier)
- def group_id(self) -> str:
- """
- @brief 获取分组ID。
- """
- return self.m_cfg.identifier
- def monitor_loop(self):
- """
- @brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。
- """
- VSC_INFO("coordinator", "[START] monitor loop starting for group %s", self.m_cfg.identifier)
- rng = random.Random()
-
- while not self.m_stop_event.is_set():
- sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
- time.sleep(sleep_ms)
-
- now = time.time()
-
- # 拷贝任务列表
- tasks_to_process = []
- with self.m_lock:
- tasks_to_process = list(self.m_tasks)
-
- for task in tasks_to_process:
- with self.m_lock:
- if task not in self.m_tasks:
- continue
- if not task.instance.health_check():
- continue
-
- if now < task.next_run:
- continue
-
- # 执行查询
- is_booking_triggered = False
- try:
- result = task.instance.query()
- if result.success:
- # === 关键修改:on_query_result 现在会阻塞直到抢票结束 ===
- self.on_query_result(task.instance, result)
- is_booking_triggered = True
- else:
- error = task.instance.get_last_error()
- if error.error_code != 2001: # 忽略 No availability found
- VSC_DEBUG("coordinator", "[%s] Query failed, code=%d, msg=%s",
- self.m_cfg.identifier, error.error_code, error.error_message)
- except Exception as e:
- VSC_ERROR("coordinator", "[%s] Exception during query: %s", self.m_cfg.identifier, str(e))
- # 计算下次运行时间
- # 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬
- if is_booking_triggered:
- interval = rng.randint(30, 60) # 抢完票休息 30-60 秒
- VSC_INFO("coordinator", "[%s] Booking attempted, entering cooldown for %d sec.", self.m_cfg.identifier, interval)
- else:
- interval = 30
- mode = task.qw_cfg.mode
- if mode == QueryWaitMode.Loop:
- interval = 0
- elif mode == QueryWaitMode.Fixed:
- interval = task.qw_cfg.fixed_wait
- elif mode == QueryWaitMode.Random:
- interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
-
- task.next_run = time.time() + interval
- # 清理不健康实例
- with self.m_lock:
- initial_size = len(self.m_tasks)
- self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
- if len(self.m_tasks) < initial_size:
- VSC_WARN("coordinator", "[%s] Removed %d unhealthy instance(s). Remaining: %d",
- self.m_cfg.identifier, initial_size - len(self.m_tasks), len(self.m_tasks))
- VSC_INFO("coordinator", "[STOP] monitor loop exiting for group %s", self.m_cfg.identifier)
- def creator_loop(self):
- """
- @brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。
- """
- VSC_INFO("coordinator", "[START] creator loop starting for group %s", self.m_cfg.identifier)
-
- while not self.m_stop_event.is_set():
- time.sleep(0.1) # 避免空转太快
-
- diff = 0
- with self.m_lock:
- current_instances_count = len(self.m_tasks)
- diff = self.m_cfg.target_instances - current_instances_count
-
- if diff > 0:
- VSC_INFO("coordinator", "[%s] Need to create %d new instance(s). Current: %d, Target: %d",
- self.m_cfg.identifier, diff, current_instances_count, self.m_cfg.target_instances)
-
- # 准备配置
- plg_cfg = self._make_plg_config()
- if not plg_cfg:
- VSC_WARN("coordinator", "[%s] Failed to prepare plugin configuration, sleeping 30s.", self.m_cfg.identifier)
- time.sleep(30) # 等待资源 (账户/代理) 恢复
- continue
- # 在线程池中创建实例,模拟C++的异步创建
- future = ThreadPool.getInstance().enqueue(self._create_instance, plg_cfg)
- inst = future.result() # 等待创建完成
- if inst:
- with self.m_lock:
- # 确保在添加到任务列表之前,实例数量仍然低于目标值
- if len(self.m_tasks) < self.m_cfg.target_instances:
- new_task = Task(
- instance=inst,
- qw_cfg=self.m_cfg.query_wait,
- next_run=time.time() # 立即执行第一次查询
- )
- self.m_tasks.append(new_task)
- VSC_INFO("coordinator", "[%s] New instance added. Total instances: %d",
- self.m_cfg.identifier, len(self.m_tasks))
- else:
- VSC_DEBUG("coordinator", "[%s] Target instances already met, discarding newly created instance.", self.m_cfg.identifier)
- else:
- VSC_WARN("coordinator", "[%s] Failed to create plugin instance.", self.m_cfg.identifier)
- # 可以在这里添加重试逻辑或错误处理
- # 模拟创建间隔,避免瞬间创建过多实例
- time.sleep(random.uniform(1.0, 5.0))
- VSC_INFO("coordinator", "[STOP] creator loop exiting for group %s", self.m_cfg.identifier)
- def _make_plg_config(self) -> Optional[VSPlgConfig]:
- """
- @brief 准备插件配置 (账号、代理等)。
- """
- VSC_DEBUG("coordinator", "[%s] Preparing plugin configuration...", self.m_cfg.identifier)
- plg_cfg = VSPlgConfig()
-
- # 账号配置
- if self.m_cfg.need_account:
- account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool)
- if not account:
- VSC_WARN("coordinator", "[%s] No available accounts for pool '%s'", self.m_cfg.identifier, self.m_cfg.account_pool)
- return None
- plg_cfg.account.id = account["id"]
- plg_cfg.account.username = account["username"]
- plg_cfg.account.password = account["password"]
- plg_cfg.account.lock_until = account.get("lock_until", "")
- VSC_DEBUG("coordinator", "[%s] Using account ID %d, username %s", self.m_cfg.identifier, plg_cfg.account.id, plg_cfg.account.username)
- # 代理配置
- if self.m_cfg.need_proxy:
- proxy = None
- if self.m_cfg.need_ip_bind:
- proxy_id = BindingManager.Instance().get_bounded_proxy_id(self.m_cfg.account_pool, plg_cfg.account.id)
- if proxy_id is None: # 没有绑定代理,需要获取一个新的并绑定
- bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool)
- proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids)
- if not proxy:
- VSC_WARN("coordinator", "[%s] No available unbind proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool)
- return None
- BindingManager.Instance().create_binding(
- self.m_cfg.account_pool, plg_cfg.account.id,
- self.m_cfg.proxy_pool, proxy["id"], "dynamic")
- VSC_INFO("coordinator", "[%s] Created dynamic binding: account %d -> proxy %d",
- self.m_cfg.identifier, plg_cfg.account.id, proxy["id"])
- else: # 已经有绑定代理,直接获取
- all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, [])
- proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None)
- if not proxy:
- VSC_ERROR("coordinator", "[%s] Bounded proxy ID %d not found in pool %s", self.m_cfg.identifier, proxy_id, self.m_cfg.proxy_pool)
- return None
- else:
- proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool)
- if not proxy:
- VSC_WARN("coordinator", "[%s] No available proxy in pool '%s'", self.m_cfg.identifier, self.m_cfg.proxy_pool)
- return None
- plg_cfg.proxy.id = proxy["id"]
- plg_cfg.proxy.ip = proxy["ip"]
- plg_cfg.proxy.port = proxy["port"]
- plg_cfg.proxy.scheme = proxy["scheme"]
- plg_cfg.proxy.username = proxy.get("username", "")
- plg_cfg.proxy.password = proxy.get("password", "")
- plg_cfg.proxy.lock_until = proxy.get("lock_until", "")
- VSC_DEBUG("coordinator", "[%s] Using proxy ID %d, IP %s:%d",
- self.m_cfg.identifier, plg_cfg.proxy.id, plg_cfg.proxy.ip, plg_cfg.proxy.port)
- plg_cfg.free_config = self.m_cfg.free_config
- VSC_DEBUG("coordinator", "[%s] Plugin configuration prepared.", self.m_cfg.identifier)
- return plg_cfg
- def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]:
- """
- @brief 创建并初始化单个插件实例。
- 这个方法在 creator_loop 的线程池中执行。
- """
- VSC_DEBUG("coordinator", "[%s] Creating plugin instance (plugin=%s)...", self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
- try:
- inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
- inst.set_config(plg_cfg)
- success = inst.create_session()
-
- # 无论成功失败都锁定账号
- if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0:
- AccountManager.Instance().lock_account(
- self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60)
- if not success:
- error = inst.get_last_error()
- VSC_ERROR("coordinator", "[%s] Create session failed, code=%d, msg=%s",
- self.m_cfg.identifier, error.error_code, error.error_message)
- return None
-
- VSC_INFO("coordinator", "[%s] Plugin instance created and session established.", self.m_cfg.identifier)
- return inst
- except Exception as e:
- VSC_ERROR("coordinator", "[%s] Error creating plugin instance: %s", self.m_cfg.identifier, str(e))
- return None
- def on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult):
- """
- @brief 处理查询结果
- *** 关键修改 ***
- 这里会阻塞直到所有发起的预订任务完成。这防止了 Monitor Loop 继续运行导致
- 同一个账号在抢票的同时又发起新的 Query。
- """
- VSC_INFO("coordinator", "[%s] Query result received. BLOCKING monitor loop for booking...", self.m_cfg.identifier)
- # 获取所有当前健康的实例进行并发预订 (通常是当前找到票的这个实例,或者是组内所有实例)
- # 策略:
- # 1. 激进策略:所有实例一起抢 (可能导致互踢)
- # 2. 保守策略:仅当前实例抢 (sptr)
- # 这里使用保守策略,避免多实例同时操作一个 Pool 里的不同账号去抢同一个 Slot 导致资源竞争过大
- # 如果你想多实例抢,可以使用 self.m_tasks 里的实例。
-
- # 即使只用当前实例,也放入 list 统一处理逻辑
- instances_for_booking = [sptr]
-
- def book_task(inst: IVSPlg, result: VSQueryResult):
- try:
- VSC_INFO("coordinator", "[%s] Starting book() procedure...", inst.get_group_id())
- book_res = inst.book(result)
- if book_res.success:
- VSC_INFO("coordinator", "[%s] Booking SUCCESS! Order: '%s'",
- inst.get_group_id(), book_res.order_id)
- if hasattr(self, 'push_callback_') and self.push_callback_:
- self.push_callback_(100, f"Booking Success: {book_res.order_id}".encode('utf-8'), 0)
- else:
- error = inst.get_last_error()
- VSC_ERROR("coordinator", "[%s] Booking FAILED. Code=%d, Msg=%s",
- inst.get_group_id(), error.error_code, error.error_message)
- except Exception as e:
- VSC_ERROR("coordinator", "[%s] Exception during booking: %s", inst.get_group_id(), str(e))
-
- # 1. 提交任务到线程池并获取 Future 对象
- futures = []
- for inst in instances_for_booking:
- f = self.book_executor.enqueue(book_task, inst, query_result)
- futures.append(f)
-
- # 2. === 阻塞等待 ===
- # wait 会阻塞当前线程 (Monitor Thread),直到所有 future 完成
- # 这样确保了抢票期间不会发起新的 Query
- VSC_INFO("coordinator", "[%s] Waiting for booking tasks to complete...", self.m_cfg.identifier)
- wait(futures)
- VSC_INFO("coordinator", "[%s] Booking tasks completed. Resuming monitor loop.", self.m_cfg.identifier)
|