import os import time import json import threading import random import traceback import redis from typing import List, Dict, Callable from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType from vs_plg_factory import VSPlgFactory from toolkit.thread_pool import ThreadPool from toolkit.vs_cloud_api import VSCloudApi from toolkit.proxy_manager import ProxyManager class BuiltinBookerGCO: """ 非绑定模式 (公共内置账号池): - 只维护全局 target_instances 数量的实例。 - 所有实例热机等待,发现信号后临时去云端 Pop 订单。 """ def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None): self.m_cfg = cfg self.m_factory = VSPlgFactory() self.m_logger = logger self.m_tasks: List[Task] = [] self.m_lock = threading.RLock() self.m_stop_event = threading.Event() self.redis_client = redis.Redis(**redis_conf) self.m_pending_builtin = 0 def _log(self, message): if self.m_logger: self.m_logger(f'[BUILTIN-BOOKER] [{self.m_cfg.identifier}] {message}') def start(self): if not self.m_cfg.enable: return self._log("Starting Built-in Booker...") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin) self.m_factory.register_plugin(plugin_name, plugin_path, class_name) threading.Thread(target=self._booking_trigger_loop, daemon=True).start() threading.Thread(target=self._creator_loop, daemon=True).start() threading.Thread(target=self._maintain_loop, daemon=True).start() def stop(self): self._log("Stopping Booker...") self.m_stop_event.set() def _get_redis_key(self, routing_key: str) -> str: return f"vs:signal:{routing_key}" def _maintain_loop(self): self._log("Maintain loop started.") rng = random.Random() while not self.m_stop_event.is_set(): wait_seconds = rng.randint(60, 300) for _ in range(wait_seconds): if self.m_stop_event.is_set(): return time.sleep(1.0) with self.m_lock: tasks_to_check = list(self.m_tasks) healthy_tasks = [] for t in tasks_to_check: try: t.instance.keep_alive() if t.instance.health_check(): healthy_tasks.append(t) except Exception as e: self._log(f"Instance keep-alive failed: {e}") with self.m_lock: self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks] def _booking_trigger_loop(self): self._log("Trigger loop started.") while not self.m_stop_event.is_set(): try: time.sleep(1.0) now = time.time() for apt_type in self.m_cfg.appointment_types: redis_key = self._get_redis_key(apt_type.routing_key) if not self.redis_client.get(redis_key): continue data = json.loads(self.redis_client.get(redis_key)) query_result = VSQueryResult.model_validate(data['query_result']) query_result.apt_type = AppointmentType.model_validate(data['apt_type']) with self.m_lock: for task in self.m_tasks: if now < task.next_run or not task.book_allowed: continue if apt_type.routing_key not in task.acceptable_routing_keys: continue self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}") task.next_run = now + self.m_cfg.booker.booking_cooldown ThreadPool.getInstance().enqueue(self._execute_book_job, task, query_result) except Exception as e: self._log(f"Trigger loop error: {e}") time.sleep(2) def _execute_book_job(self, task: Task, query_result: VSQueryResult): queue_name = f"auto.{query_result.apt_type.routing_key}" task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name) if not task_data: return task_id = task_data['id'] order_id = task_data.get('order_id') user_input = task_data.get('user_inputs', {}) booking_success = False try: book_res = task.instance.book(query_result, user_input) if book_res.success: booking_success = True self._log(f"✅ BOOK SUCCESS! Order: {order_id}") grab_info = { "account": book_res.account, "session_id": book_res.session_id, "urn": book_res.urn, "slot_date": book_res.book_date, "slot_time": book_res.book_time, "timestamp": int(time.time()), "payment_link": book_res.payment_link } VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info}) # === 核心:成功次数判断 === task.successful_bookings += 1 max_b = self.m_cfg.booker.max_bookings_per_account if max_b > 0 and task.successful_bookings >= max_b: self._log(f"Account reached max bookings ({max_b}). Destroying instance.") with self.m_lock: if task in self.m_tasks: self.m_tasks.remove(task) else: self._log(f"❌ BOOK FAILED for Order: {order_id}") except Exception as e: self._log(f"Exception during booking: {e}") finally: if not booking_success and task_id: try: VSCloudApi.Instance().return_vas_task_to_queue(task_id) except Exception as ex: self._log(f"Failed to return task: {ex}") def _creator_loop(self): self._log("Creator loop started.") while not self.m_stop_event.is_set(): time.sleep(5.0) with self.m_lock: current = len(self.m_tasks) if (current + self.m_pending_builtin) < self.m_cfg.booker.target_instances: self._spawn_worker() def _spawn_worker(self): with self.m_lock: self.m_pending_builtin += 1 def _job(): try: plg_cfg = VSPlgConfig() plg_cfg.debug = self.m_cfg.debug plg_cfg.free_config = self.m_cfg.free_config plg_cfg.session_max_life = self.m_cfg.session_max_life if self.m_cfg.need_account: acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd * 60) if not acc: return plg_cfg.account.id = acc['id'] plg_cfg.account.username = acc['username'] plg_cfg.account.password = acc['password'] if self.m_cfg.need_proxy: proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval) if not proxy: return plg_cfg.proxy.id = proxy['id'] plg_cfg.proxy.ip = proxy['ip'] plg_cfg.proxy.port = proxy['port'] plg_cfg.proxy.scheme = proxy['scheme'] plg_cfg.proxy.username = proxy['username'] plg_cfg.proxy.password = proxy['password'] instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) instance.set_log(self.m_logger) instance.set_config(plg_cfg) instance.create_session() with self.m_lock: all_keys = [apt.routing_key for apt in self.m_cfg.appointment_types] self.m_tasks.append( Task( instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time(), task_ref=None, acceptable_routing_keys=all_keys, source_queue="built-in", book_allowed=True ) ) self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}") except Exception as e: err_str = str(e) if "40401" in err_str or "Account not found" in err_str: return self._log(f"Spawn failed: {e}") finally: with self.m_lock: self.m_pending_builtin = max(0, self.m_pending_builtin - 1) ThreadPool.getInstance().enqueue(_job)