|
@@ -1,10 +1,10 @@
|
|
|
-# group_coordinator.py
|
|
|
|
|
|
|
+# gco.py
|
|
|
import os
|
|
import os
|
|
|
import time
|
|
import time
|
|
|
import json
|
|
import json
|
|
|
import random
|
|
import random
|
|
|
import threading
|
|
import threading
|
|
|
-from typing import List, Optional
|
|
|
|
|
|
|
+from typing import List, Optional, Callable
|
|
|
from concurrent.futures import wait
|
|
from concurrent.futures import wait
|
|
|
|
|
|
|
|
# 导入所有依赖
|
|
# 导入所有依赖
|
|
@@ -16,18 +16,18 @@ from toolkit.proxy_manager import ProxyManager
|
|
|
from toolkit.binding_manager import BindingManager
|
|
from toolkit.binding_manager import BindingManager
|
|
|
from toolkit.thread_pool import ThreadPool
|
|
from toolkit.thread_pool import ThreadPool
|
|
|
from toolkit.vs_cloud_api import VSCloudApi
|
|
from toolkit.vs_cloud_api import VSCloudApi
|
|
|
-from vs_log_macros import VSC_INFO, VSC_DEBUG, VSC_WARN, VSC_ERROR
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-class GroupCoordinator:
|
|
|
|
|
|
|
+class GCO:
|
|
|
"""
|
|
"""
|
|
|
- @brief GroupCoordinator 类
|
|
|
|
|
|
|
+ @brief GCO 类
|
|
|
负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
|
|
负责管理一个组内的签证插件实例,包括实例的创建、健康检查、
|
|
|
任务调度、查询和预订流程。
|
|
任务调度、查询和预订流程。
|
|
|
"""
|
|
"""
|
|
|
- def __init__(self, cfg: GroupConfig):
|
|
|
|
|
|
|
+ def __init__(self, cfg: GroupConfig, logger: Callable[[str], None] = None):
|
|
|
self.m_cfg = cfg
|
|
self.m_cfg = cfg
|
|
|
self.m_factory = VSPlgFactory() # 插件工厂实例
|
|
self.m_factory = VSPlgFactory() # 插件工厂实例
|
|
|
|
|
+ self.m_logger = logger
|
|
|
|
|
|
|
|
self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
|
|
self.m_tasks: List[Task] = [] # 存储所有运行中的任务实例
|
|
|
|
|
|
|
@@ -36,29 +36,16 @@ class GroupCoordinator:
|
|
|
|
|
|
|
|
self.m_monitor_thread: Optional[threading.Thread] = None
|
|
self.m_monitor_thread: Optional[threading.Thread] = None
|
|
|
self.m_creator_thread: Optional[threading.Thread] = None
|
|
self.m_creator_thread: Optional[threading.Thread] = None
|
|
|
-
|
|
|
|
|
- # 预订操作的线程池,独立于任务调度
|
|
|
|
|
- self.book_executor = ThreadPool(max_workers=5).getInstance()
|
|
|
|
|
-
|
|
|
|
|
- VSC_INFO("coordinator", f"GroupCoordinator for {self.m_cfg.identifier} initialized.")
|
|
|
|
|
-
|
|
|
|
|
- def set_push_callback(self, cb):
|
|
|
|
|
- """
|
|
|
|
|
- @brief 设置推送回调函数 (C++中的PushCallback)
|
|
|
|
|
- Python中可以直接传递可调用对象。
|
|
|
|
|
- """
|
|
|
|
|
- self.push_callback_ = cb
|
|
|
|
|
- VSC_INFO("coordinator", f"Push callback set for group {self.m_cfg.identifier}.")
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
def start(self):
|
|
|
"""
|
|
"""
|
|
|
@brief 启动协调器,包括插件注册和线程启动。
|
|
@brief 启动协调器,包括插件注册和线程启动。
|
|
|
"""
|
|
"""
|
|
|
if not self.m_cfg.enable:
|
|
if not self.m_cfg.enable:
|
|
|
- VSC_WARN("coordinator", f"Group {self.m_cfg.identifier} is disabled, not starting.")
|
|
|
|
|
|
|
+ self._log("Group is disabled, not starting.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- VSC_INFO("coordinator", f"Starting coordinator for group {self.m_cfg.identifier}...")
|
|
|
|
|
|
|
+ self._log("Starting coordinator...")
|
|
|
self.m_stop_event.clear()
|
|
self.m_stop_event.clear()
|
|
|
|
|
|
|
|
# 注册插件
|
|
# 注册插件
|
|
@@ -70,55 +57,47 @@ class GroupCoordinator:
|
|
|
class_name = "".join(part.title() for part in plugin_name.split('_'))
|
|
class_name = "".join(part.title() for part in plugin_name.split('_'))
|
|
|
|
|
|
|
|
# 调试日志:确认推导出的类名
|
|
# 调试日志:确认推导出的类名
|
|
|
- VSC_DEBUG("coordinator", f"Inferring class name for plugin {plugin_name}: {class_name}")
|
|
|
|
|
|
|
+ self._log(f"Inferring class name for plugin {plugin_name}: {class_name}")
|
|
|
|
|
|
|
|
self.m_factory.register_plugin(plugin_name,
|
|
self.m_factory.register_plugin(plugin_name,
|
|
|
plugin_module_path,
|
|
plugin_module_path,
|
|
|
class_name)
|
|
class_name)
|
|
|
|
|
|
|
|
- self.m_monitor_thread = threading.Thread(target=self.monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
|
|
|
|
|
- self.m_creator_thread = threading.Thread(target=self.creator_loop, name=f"Creator-{self.m_cfg.identifier}")
|
|
|
|
|
|
|
+ self.m_monitor_thread = threading.Thread(target=self._monitor_loop, name=f"Monitor-{self.m_cfg.identifier}")
|
|
|
|
|
+ self.m_creator_thread = threading.Thread(target=self._creator_loop, name=f"Creator-{self.m_cfg.identifier}")
|
|
|
|
|
|
|
|
self.m_monitor_thread.start()
|
|
self.m_monitor_thread.start()
|
|
|
self.m_creator_thread.start()
|
|
self.m_creator_thread.start()
|
|
|
- VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} threads started.")
|
|
|
|
|
|
|
+ self._log("Coordinator threads started.")
|
|
|
|
|
|
|
|
def stop(self):
|
|
def stop(self):
|
|
|
"""
|
|
"""
|
|
|
@brief 停止协调器,等待所有线程结束。
|
|
@brief 停止协调器,等待所有线程结束。
|
|
|
"""
|
|
"""
|
|
|
- VSC_INFO("coordinator", f"Stopping coordinator for group {self.m_cfg.identifier}...")
|
|
|
|
|
|
|
+ self._log("Stopping coordinator...")
|
|
|
self.m_stop_event.set() # 发送停止信号
|
|
self.m_stop_event.set() # 发送停止信号
|
|
|
|
|
|
|
|
if self.m_monitor_thread and self.m_monitor_thread.is_alive():
|
|
if self.m_monitor_thread and self.m_monitor_thread.is_alive():
|
|
|
self.m_monitor_thread.join()
|
|
self.m_monitor_thread.join()
|
|
|
if self.m_creator_thread and self.m_creator_thread.is_alive():
|
|
if self.m_creator_thread and self.m_creator_thread.is_alive():
|
|
|
self.m_creator_thread.join()
|
|
self.m_creator_thread.join()
|
|
|
-
|
|
|
|
|
- # 关闭预订线程池
|
|
|
|
|
- self.book_executor.shutdown(wait=True)
|
|
|
|
|
- VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} stopped.")
|
|
|
|
|
-
|
|
|
|
|
- def restart(self):
|
|
|
|
|
- """
|
|
|
|
|
- @brief 重启协调器。
|
|
|
|
|
- """
|
|
|
|
|
- VSC_INFO("coordinator", f"Restarting coordinator for group {self.m_cfg.identifier}...", )
|
|
|
|
|
- self.stop()
|
|
|
|
|
- self.start()
|
|
|
|
|
- VSC_INFO("coordinator", f"Coordinator for group {self.m_cfg.identifier} restarted.")
|
|
|
|
|
|
|
+ self._log("Coordinator stopped.")
|
|
|
|
|
|
|
|
def group_id(self) -> str:
|
|
def group_id(self) -> str:
|
|
|
"""
|
|
"""
|
|
|
@brief 获取分组ID。
|
|
@brief 获取分组ID。
|
|
|
"""
|
|
"""
|
|
|
return self.m_cfg.identifier
|
|
return self.m_cfg.identifier
|
|
|
|
|
+
|
|
|
|
|
+ def _log(self, message):
|
|
|
|
|
+ if self.m_logger:
|
|
|
|
|
+ self.m_logger(f'[gco] [{self.m_cfg.identifier}] {message}')
|
|
|
|
|
|
|
|
- def monitor_loop(self):
|
|
|
|
|
|
|
+ def _monitor_loop(self):
|
|
|
"""
|
|
"""
|
|
|
@brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。
|
|
@brief 监控循环:定期检查实例健康状况,执行查询任务,并根据结果触发预订。
|
|
|
"""
|
|
"""
|
|
|
- VSC_INFO("coordinator", f"[START] monitor loop starting for group {self.m_cfg.identifier}")
|
|
|
|
|
|
|
+ self._log("[START] monitor loop starting...")
|
|
|
rng = random.Random()
|
|
rng = random.Random()
|
|
|
|
|
|
|
|
while not self.m_stop_event.is_set():
|
|
while not self.m_stop_event.is_set():
|
|
@@ -148,19 +127,19 @@ class GroupCoordinator:
|
|
|
try:
|
|
try:
|
|
|
result = task.instance.query()
|
|
result = task.instance.query()
|
|
|
if result.success:
|
|
if result.success:
|
|
|
- # === 关键修改:on_query_result 现在会阻塞直到抢票结束 ===
|
|
|
|
|
- self.on_query_result(task.instance, result)
|
|
|
|
|
|
|
+ # === 关键修改:_on_query_result 现在会阻塞直到抢票结束 ===
|
|
|
|
|
+ self._on_query_result(task.instance, result)
|
|
|
is_booking_triggered = True
|
|
is_booking_triggered = True
|
|
|
else:
|
|
else:
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Query done, No availability found")
|
|
|
|
|
|
|
+ self._log("Query done, No availability found")
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Exception during query: {e}")
|
|
|
|
|
|
|
+ self._log(f"Exception during query: {e}")
|
|
|
|
|
|
|
|
# 计算下次运行时间
|
|
# 计算下次运行时间
|
|
|
# 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬
|
|
# 如果刚刚触发了抢票(无论成功失败),建议强制加长一点冷却时间,防止反爬
|
|
|
if is_booking_triggered:
|
|
if is_booking_triggered:
|
|
|
interval = rng.randint(30, 60) # 抢完票休息 30-60 秒
|
|
interval = rng.randint(30, 60) # 抢完票休息 30-60 秒
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Booking attempted, entering cooldown for {interval} sec.")
|
|
|
|
|
|
|
+ self._log(f"Booking attempted, entering cooldown for {interval} sec.")
|
|
|
else:
|
|
else:
|
|
|
interval = 30
|
|
interval = 30
|
|
|
mode = task.qw_cfg.mode
|
|
mode = task.qw_cfg.mode
|
|
@@ -178,15 +157,15 @@ class GroupCoordinator:
|
|
|
initial_size = len(self.m_tasks)
|
|
initial_size = len(self.m_tasks)
|
|
|
self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
|
|
self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
|
|
|
if len(self.m_tasks) < initial_size:
|
|
if len(self.m_tasks) < initial_size:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
|
|
|
|
|
|
|
+ self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
|
|
|
|
|
|
|
|
- VSC_INFO("coordinator", f"[STOP] monitor loop exiting for group {self.m_cfg.identifier}")
|
|
|
|
|
|
|
+ self._log("[STOP] monitor loop exiting...")
|
|
|
|
|
|
|
|
- def creator_loop(self):
|
|
|
|
|
|
|
+ def _creator_loop(self):
|
|
|
"""
|
|
"""
|
|
|
@brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。
|
|
@brief 创建者循环:根据目标实例数量,创建和补充新的插件实例。
|
|
|
"""
|
|
"""
|
|
|
- VSC_INFO("coordinator", f"[START] creator loop starting for group {self.m_cfg.identifier}")
|
|
|
|
|
|
|
+ self._log("[START] creator loop starting...")
|
|
|
|
|
|
|
|
while not self.m_stop_event.is_set():
|
|
while not self.m_stop_event.is_set():
|
|
|
time.sleep(0.1) # 避免空转太快
|
|
time.sleep(0.1) # 避免空转太快
|
|
@@ -197,12 +176,12 @@ class GroupCoordinator:
|
|
|
diff = self.m_cfg.target_instances - current_instances_count
|
|
diff = self.m_cfg.target_instances - current_instances_count
|
|
|
|
|
|
|
|
if diff > 0:
|
|
if diff > 0:
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Need to create {diff} new instance(s). Current: {current_instances_count}, Target: {self.m_cfg.target_instances}")
|
|
|
|
|
|
|
+ self._log(f"Need to create {diff} new instance(s). Current: {current_instances_count}")
|
|
|
|
|
|
|
|
# 准备配置
|
|
# 准备配置
|
|
|
plg_cfg = self._make_plg_config()
|
|
plg_cfg = self._make_plg_config()
|
|
|
if not plg_cfg:
|
|
if not plg_cfg:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Failed to prepare plugin configuration, sleeping 30s.")
|
|
|
|
|
|
|
+ self._log("Failed to prepare plugin configuration, sleeping 30s.")
|
|
|
time.sleep(30) # 等待资源 (账户/代理) 恢复
|
|
time.sleep(30) # 等待资源 (账户/代理) 恢复
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
@@ -220,23 +199,23 @@ class GroupCoordinator:
|
|
|
next_run=time.time() # 立即执行第一次查询
|
|
next_run=time.time() # 立即执行第一次查询
|
|
|
)
|
|
)
|
|
|
self.m_tasks.append(new_task)
|
|
self.m_tasks.append(new_task)
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] New instance added. Total instances: {len(self.m_tasks)}")
|
|
|
|
|
|
|
+ self._log(f"New instance added. Total instances: {len(self.m_tasks)}")
|
|
|
else:
|
|
else:
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Target instances already met, discarding newly created instance.")
|
|
|
|
|
|
|
+ self._log("Target instances already met, discarding newly created instance.")
|
|
|
else:
|
|
else:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] Failed to create plugin instance.")
|
|
|
|
|
|
|
+ self._log("Failed to create plugin instance.")
|
|
|
# 可以在这里添加重试逻辑或错误处理
|
|
# 可以在这里添加重试逻辑或错误处理
|
|
|
|
|
|
|
|
# 模拟创建间隔,避免瞬间创建过多实例
|
|
# 模拟创建间隔,避免瞬间创建过多实例
|
|
|
time.sleep(random.uniform(1.0, 5.0))
|
|
time.sleep(random.uniform(1.0, 5.0))
|
|
|
|
|
|
|
|
- VSC_INFO("coordinator", f"[STOP] creator loop exiting for group {self.m_cfg.identifier}")
|
|
|
|
|
|
|
+ self._log("[STOP] creator loop exiting...")
|
|
|
|
|
|
|
|
def _make_plg_config(self) -> Optional[VSPlgConfig]:
|
|
def _make_plg_config(self) -> Optional[VSPlgConfig]:
|
|
|
"""
|
|
"""
|
|
|
@brief 准备插件配置 (账号、代理等)。
|
|
@brief 准备插件配置 (账号、代理等)。
|
|
|
"""
|
|
"""
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Preparing plugin configuration...")
|
|
|
|
|
|
|
+ self._log("Preparing plugin configuration...")
|
|
|
plg_cfg = VSPlgConfig()
|
|
plg_cfg = VSPlgConfig()
|
|
|
plg_cfg.debug = self.m_cfg.debug
|
|
plg_cfg.debug = self.m_cfg.debug
|
|
|
|
|
|
|
@@ -244,13 +223,13 @@ class GroupCoordinator:
|
|
|
if self.m_cfg.need_account:
|
|
if self.m_cfg.need_account:
|
|
|
account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool)
|
|
account = AccountManager.Instance().get_next_account(self.m_cfg.account_pool)
|
|
|
if not account:
|
|
if not account:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available accounts for pool {self.m_cfg.account_pool}")
|
|
|
|
|
|
|
+ self._log(f"No available accounts for pool {self.m_cfg.account_pool}")
|
|
|
return None
|
|
return None
|
|
|
plg_cfg.account.id = account["id"]
|
|
plg_cfg.account.id = account["id"]
|
|
|
plg_cfg.account.username = account["username"]
|
|
plg_cfg.account.username = account["username"]
|
|
|
plg_cfg.account.password = account["password"]
|
|
plg_cfg.account.password = account["password"]
|
|
|
plg_cfg.account.lock_until = account.get("lock_until", "")
|
|
plg_cfg.account.lock_until = account.get("lock_until", "")
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Using account ID {plg_cfg.account.id}, username {plg_cfg.account.username}")
|
|
|
|
|
|
|
+ self._log(f"Using account ID {plg_cfg.account.id}, username {plg_cfg.account.username}")
|
|
|
|
|
|
|
|
# 代理配置
|
|
# 代理配置
|
|
|
if self.m_cfg.need_proxy:
|
|
if self.m_cfg.need_proxy:
|
|
@@ -261,22 +240,22 @@ class GroupCoordinator:
|
|
|
bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool)
|
|
bounded_ids = BindingManager.Instance().get_bounded_proxies_ids(self.m_cfg.account_pool, self.m_cfg.proxy_pool)
|
|
|
proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids)
|
|
proxy = ProxyManager.Instance().get_unbind_proxy(self.m_cfg.proxy_pool, bounded_ids)
|
|
|
if not proxy:
|
|
if not proxy:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available unbind proxy in pool {self.m_cfg.proxy_pool}")
|
|
|
|
|
|
|
+ self._log(f"No available unbind proxy in pool {self.m_cfg.proxy_pool}")
|
|
|
return None
|
|
return None
|
|
|
BindingManager.Instance().create_binding(
|
|
BindingManager.Instance().create_binding(
|
|
|
self.m_cfg.account_pool, plg_cfg.account.id,
|
|
self.m_cfg.account_pool, plg_cfg.account.id,
|
|
|
self.m_cfg.proxy_pool, proxy["id"], "dynamic")
|
|
self.m_cfg.proxy_pool, proxy["id"], "dynamic")
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Created dynamic binding: account {plg_cfg.account.id} -> proxy {proxy['id']}")
|
|
|
|
|
|
|
+ self._log(f"Created dynamic binding: account {plg_cfg.account.id} -> proxy {proxy['id']}")
|
|
|
else:
|
|
else:
|
|
|
all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, [])
|
|
all_proxies_in_pool = ProxyManager.Instance()._proxies.get(self.m_cfg.proxy_pool, [])
|
|
|
proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None)
|
|
proxy = next((p for p in all_proxies_in_pool if p["id"] == proxy_id), None)
|
|
|
if not proxy:
|
|
if not proxy:
|
|
|
- VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Bounded proxy ID {proxy_id} not found in pool {self.m_cfg.proxy_pool}")
|
|
|
|
|
- return None
|
|
|
|
|
|
|
+ self._log(f"Bounded proxy ID {proxy_id} not found in pool {self.m_cfg.proxy_pool}")
|
|
|
|
|
+ return None
|
|
|
else:
|
|
else:
|
|
|
proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool)
|
|
proxy = ProxyManager.Instance().get_next_proxy(self.m_cfg.proxy_pool)
|
|
|
if not proxy:
|
|
if not proxy:
|
|
|
- VSC_WARN("coordinator", f"[{self.m_cfg.identifier}] No available proxy in pool {self.m_cfg.proxy_pool}")
|
|
|
|
|
|
|
+ self._log(f"No available proxy in pool {self.m_cfg.proxy_pool}")
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
plg_cfg.proxy.id = proxy["id"]
|
|
plg_cfg.proxy.id = proxy["id"]
|
|
@@ -286,10 +265,10 @@ class GroupCoordinator:
|
|
|
plg_cfg.proxy.username = proxy.get("username", "")
|
|
plg_cfg.proxy.username = proxy.get("username", "")
|
|
|
plg_cfg.proxy.password = proxy.get("password", "")
|
|
plg_cfg.proxy.password = proxy.get("password", "")
|
|
|
plg_cfg.proxy.lock_until = proxy.get("lock_until", "")
|
|
plg_cfg.proxy.lock_until = proxy.get("lock_until", "")
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Using proxy ID {plg_cfg.proxy.id}, IP {plg_cfg.proxy.ip}:{plg_cfg.proxy.port}")
|
|
|
|
|
|
|
+ self._log(f"Using proxy ID {plg_cfg.proxy.id}, IP {plg_cfg.proxy.ip}:{plg_cfg.proxy.port}")
|
|
|
|
|
|
|
|
plg_cfg.free_config = self.m_cfg.free_config
|
|
plg_cfg.free_config = self.m_cfg.free_config
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Plugin configuration prepared.")
|
|
|
|
|
|
|
+ self._log("Plugin configuration prepared.")
|
|
|
return plg_cfg
|
|
return plg_cfg
|
|
|
|
|
|
|
|
def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]:
|
|
def _create_instance(self, plg_cfg: VSPlgConfig) -> Optional[IVSPlg]:
|
|
@@ -297,22 +276,23 @@ class GroupCoordinator:
|
|
|
# @brief 创建并初始化单个插件实例。
|
|
# @brief 创建并初始化单个插件实例。
|
|
|
# 这个方法在 creator_loop 的线程池中执行。
|
|
# 这个方法在 creator_loop 的线程池中执行。
|
|
|
# """
|
|
# """
|
|
|
- VSC_DEBUG("coordinator", f"[{self.m_cfg.identifier}] Creating plugin instance (plugin={self.m_cfg.plugin_config.plugin_name})...")
|
|
|
|
|
|
|
+ self._log(f"Creating plugin instance (plugin={self.m_cfg.plugin_config.plugin_name})...")
|
|
|
try:
|
|
try:
|
|
|
inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
|
|
inst = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
|
|
|
|
|
+ inst.set_log(self.m_logger)
|
|
|
inst.set_config(plg_cfg)
|
|
inst.set_config(plg_cfg)
|
|
|
inst.create_session()
|
|
inst.create_session()
|
|
|
if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0:
|
|
if self.m_cfg.need_account and self.m_cfg.account_login_interval > 0:
|
|
|
AccountManager.Instance().lock_account(
|
|
AccountManager.Instance().lock_account(
|
|
|
self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60)
|
|
self.m_cfg.account_pool, plg_cfg.account.id, self.m_cfg.account_login_interval * 60)
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Plugin instance created and session established.")
|
|
|
|
|
|
|
+ self._log("Plugin instance created and session established.")
|
|
|
return inst
|
|
return inst
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- VSC_ERROR("coordinator", f"[{self.m_cfg.identifier}] Error creating plugin instance: {e}")
|
|
|
|
|
|
|
+ self._log(f"Error creating plugin instance: {e}")
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
- def on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult):
|
|
|
|
|
- VSC_INFO("coordinator", f"[{self.m_cfg.identifier}] Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
|
|
|
|
|
|
|
+ def _on_query_result(self, sptr: IVSPlg, query_result: VSQueryResult):
|
|
|
|
|
+ self._log(f"Query result received: {str(query_result)}. BLOCKING monitor loop for booking...")
|
|
|
|
|
|
|
|
# 定义内部预订任务
|
|
# 定义内部预订任务
|
|
|
def book_task(inst: IVSPlg, result: VSQueryResult):
|
|
def book_task(inst: IVSPlg, result: VSQueryResult):
|
|
@@ -325,14 +305,14 @@ class GroupCoordinator:
|
|
|
task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
|
|
task = VSCloudApi.Instance().get_vas_task_pop(booking_routing_key)
|
|
|
|
|
|
|
|
if not task:
|
|
if not task:
|
|
|
- VSC_WARN("coordinator", f"[{inst.get_group_id()}] No pending task found for key {booking_routing_key}. Abandoning slot.")
|
|
|
|
|
|
|
+ self._log(f"No pending task found for key {booking_routing_key}. Abandoning slot.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
task_id = task['id']
|
|
task_id = task['id']
|
|
|
order_id = task['order_id']
|
|
order_id = task['order_id']
|
|
|
user_input = task.get('user_inputs', {})
|
|
user_input = task.get('user_inputs', {})
|
|
|
|
|
|
|
|
- VSC_INFO("coordinator", f"[{inst.get_group_id()}] Picked up Task ID {task_id} for booking...")
|
|
|
|
|
|
|
+ self._log(f"Picked up Task ID {task_id} for booking...")
|
|
|
|
|
|
|
|
# 2. 执行预订
|
|
# 2. 执行预订
|
|
|
# 注意:插件的 book 方法需要接收 user_input
|
|
# 注意:插件的 book 方法需要接收 user_input
|
|
@@ -340,11 +320,7 @@ class GroupCoordinator:
|
|
|
|
|
|
|
|
# 3. 处理结果
|
|
# 3. 处理结果
|
|
|
if book_res.success:
|
|
if book_res.success:
|
|
|
- VSC_INFO("coordinator", f"[{inst.get_group_id()}] Booking SUCCESS! Order: {order_id}")
|
|
|
|
|
-
|
|
|
|
|
- # 推送通知
|
|
|
|
|
- if hasattr(self, 'push_callback_') and self.push_callback_:
|
|
|
|
|
- self.push_callback_(100, f"Booking Success: {order_id}".encode('utf-8'), 0)
|
|
|
|
|
|
|
+ self._log(f" Booking SUCCESS! Order: {order_id}")
|
|
|
|
|
|
|
|
# 4. 成功逻辑:更新任务状态为 grabbed
|
|
# 4. 成功逻辑:更新任务状态为 grabbed
|
|
|
# 包含后端需要的关键信息
|
|
# 包含后端需要的关键信息
|
|
@@ -366,23 +342,23 @@ class GroupCoordinator:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
VSCloudApi.Instance().update_vas_task(task_id, update_data)
|
|
VSCloudApi.Instance().update_vas_task(task_id, update_data)
|
|
|
- VSC_INFO("coordinator", f"[{inst.get_group_id()}] Task {task_id} marked as GRABBED.")
|
|
|
|
|
|
|
+ self._log(f"Task {task_id} marked as GRABBED.")
|
|
|
# 成功后 task_id 置空,防止 finally 块再次将其重置为 pending
|
|
# 成功后 task_id 置空,防止 finally 块再次将其重置为 pending
|
|
|
task_id = None
|
|
task_id = None
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- VSC_ERROR("coordinator", f"[{inst.get_group_id()}] Exception during booking: {e.message}")
|
|
|
|
|
|
|
+ self._log(f"Exception during booking: {e}")
|
|
|
|
|
|
|
|
finally:
|
|
finally:
|
|
|
# 5. Return to Queue (回滚机制)
|
|
# 5. Return to Queue (回滚机制)
|
|
|
if task_id is not None:
|
|
if task_id is not None:
|
|
|
- VSC_WARN("coordinator", f"[{inst.get_group_id()}] Returning Task {task_id} to queue (status=pending).")
|
|
|
|
|
|
|
+ self._log(f"Returning Task {task_id} to queue (status=pending).")
|
|
|
try:
|
|
try:
|
|
|
VSCloudApi.Instance().return_vas_task_to_queue(task_id)
|
|
VSCloudApi.Instance().return_vas_task_to_queue(task_id)
|
|
|
except Exception as ex:
|
|
except Exception as ex:
|
|
|
- VSC_ERROR("coordinator", f"[{inst.get_group_id()}] Failed to return task to queue: {ex}")
|
|
|
|
|
|
|
+ self.log(f"Failed to return task to queue: {ex}")
|
|
|
|
|
|
|
|
futures = []
|
|
futures = []
|
|
|
- f = self.book_executor.enqueue(book_task, sptr, query_result)
|
|
|
|
|
|
|
+ f = ThreadPool.getInstance().enqueue(book_task, sptr, query_result)
|
|
|
futures.append(f)
|
|
futures.append(f)
|
|
|
|
|
|
|
|
wait(futures)
|
|
wait(futures)
|