import os import time import json import threading import random import redis from typing import List, Dict, Callable, Any, Optional from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType from vs_plg_factory import VSPlgFactory from toolkit.thread_pool import ThreadPool from toolkit.vs_cloud_api import VSCloudApi from toolkit.backoff import ExponentialBackoff class OrderBookerGCO: """ 绑定模式 (订单自带账号): - 按城市队列维护热机配额。 - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。 - 预订成功后,实例立即销毁。 """ def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None): self.m_cfg = cfg self.m_factory = VSPlgFactory() self.m_logger = logger self.m_tasks: List[Task] = [] self.m_lock = threading.RLock() self.m_stop_event = threading.Event() self.redis_client = redis.Redis(**redis_conf) self.m_pending_order_by_queue: Dict[str, int] = {} self.m_last_spawn_times: Dict[str, float] = {} self.m_task_data_cache: Dict[str, dict] = {} self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}" self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0) self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0) self.task_backoff = ExponentialBackoff(base_delay=10, max_delay=30*60.0, factor=2.0) self.heartbeat_ttl = 2*60.0 def _log(self, message): if self.m_logger: self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}') def start(self): if not self.m_cfg.enable: return self._log("Starting Order Booker...") plugin_name = self.m_cfg.plugin_config.plugin_name class_name = "".join(part.title() for part in plugin_name.split('_')) plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin) self.m_factory.register_plugin(plugin_name, plugin_path, class_name) threading.Thread(target=self._booking_trigger_loop, daemon=True).start() threading.Thread(target=self._creator_loop, daemon=True).start() threading.Thread(target=self._maintain_loop, daemon=True).start() threading.Thread(target=self._cache_refresh_loop, daemon=True).start() def stop(self): self._log("Stopping Booker...") self.m_stop_event.set() self._cleanup_all_tasks("booker stop") def _cleanup_task(self, task: Task, reason: str = ""): try: instance = getattr(task, 'instance', None) if instance and hasattr(instance, 'cleanup'): instance.cleanup() self._log(f"🧹 Cleaned up instance for task={getattr(task, 'task_ref', None)}. Reason: {reason}") except Exception as e: self._log(f"Cleanup failed for task={getattr(task, 'task_ref', None)}. Reason: {reason}. Error: {e}") def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True): removed = False with self.m_lock: if task in self.m_tasks: self.m_tasks.remove(task) removed = True task_id = str(getattr(task, 'task_ref', '')) self.m_task_data_cache.pop(task_id, None) if cleanup and removed: self._cleanup_task(task, reason) return removed def _cleanup_all_tasks(self, reason: str = ""): with self.m_lock: tasks = list(self.m_tasks) self.m_tasks.clear() self.m_task_data_cache.clear() for task in tasks: self._cleanup_task(task, reason) def _get_redis_key(self, routing_key: str) -> str: return f"vs:signal:{routing_key}" def _maintain_loop(self): self._log("Maintain loop started.") heartbeat_interval = 60 while not self.m_stop_event.is_set(): for _ in range(heartbeat_interval): if self.m_stop_event.is_set(): return time.sleep(1.0) with self.m_lock: tasks_to_check = list(self.m_tasks) if not tasks_to_check: continue healthy_tasks = [] dead_tasks = [] now = time.time() for t in tasks_to_check: if now >= t.next_remote_ping: try: t.instance.keep_alive() if t.instance.health_check(): healthy_tasks.append(t) next_delay = random.randint(180, 300) t.next_remote_ping = now + next_delay self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.") else: dead_tasks.append(t) self._log(f"♻️ Instance for task={t.task_ref} unhealthy.") except Exception as e: dead_tasks.append(t) self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.") else: healthy_tasks.append(t) if healthy_tasks: try: pipeline = self.redis_client.pipeline() new_deadline = time.time() + self.heartbeat_ttl for t in healthy_tasks: if t.task_ref is not None: pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline}) pipeline.execute() self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.") except Exception as e: self._log(f"Redis Heartbeat update failed: {e}") if dead_tasks: try: pipeline = self.redis_client.pipeline() for t in dead_tasks: if t.task_ref is not None: pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0}) pipeline.execute() self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.") except Exception as e: pass if dead_tasks: with self.m_lock: current_tasks = list(self.m_tasks) self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks] for t in dead_tasks: if t in current_tasks: self._cleanup_task(t, "unhealthy or keep-alive failed") else: with self.m_lock: self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks] def _cache_refresh_loop(self): self._log("Cache refresh loop started.") refresh_interval = 15*60 while not self.m_stop_event.is_set(): for _ in range(refresh_interval): if self.m_stop_event.is_set(): return time.sleep(1.0) with self.m_lock: task_ids = list(self.m_task_data_cache.keys()) if not task_ids: continue for tid in task_ids: if self.m_stop_event.is_set(): break try: fresh_data = VSCloudApi.Instance().get_vas_task(tid) if fresh_data: with self.m_lock: if tid in self.m_task_data_cache: self.m_task_data_cache[tid] = fresh_data except Exception: pass time.sleep(0.5) def _booking_trigger_loop(self): self._log("Pub/Sub Trigger loop started.") channel_to_routing_key = {} for apt in self.m_cfg.appointment_types: channel = self._get_redis_key(apt.routing_key) channel_to_routing_key[channel] = apt.routing_key if not channel_to_routing_key: self._log("No appointment types configured. Exiting trigger loop.") return pubsub = None while not self.m_stop_event.is_set(): try: if pubsub is None: pubsub = self.redis_client.pubsub(ignore_subscribe_messages=False) channels_to_sub = list(channel_to_routing_key.keys()) self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}") pubsub.subscribe(*channels_to_sub) message = pubsub.get_message(timeout=5.0) if not message: continue channel = message['channel'] if isinstance(channel, bytes): channel = channel.decode('utf-8') if message['type'] == 'subscribe': active_subs = message['data'] self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})") continue if message['type'] != 'message': continue raw_data = message['data'] if isinstance(raw_data, bytes): raw_data = raw_data.decode('utf-8') routing_key = channel_to_routing_key.get(channel) if not routing_key: continue try: data = json.loads(raw_data) query_result = VSQueryResult.model_validate(data['query_result']) query_result.apt_type = AppointmentType.model_validate(data['apt_type']) except Exception as parse_err: self._log(f"Data parsing error for channel {channel}: {parse_err}") continue now = time.time() matching_tasks = [] with self.m_lock: for task in self.m_tasks: if now < task.next_run or not task.book_allowed: continue if routing_key not in task.acceptable_routing_keys: continue task.next_run = now + self.m_cfg.booker.booking_cooldown matching_tasks.append(task) if matching_tasks: for task in matching_tasks: self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}") t = threading.Thread( target=self._execute_book_job, args=(task, query_result), daemon=True ) t.start() except Exception as e: self._log(f"Trigger loop pub/sub error: {e}") if pubsub: try: pubsub.close() except: pass pubsub = None time.sleep(2) if pubsub: pubsub.close() self._log("Pub/Sub connection closed.") def _execute_book_job(self, task: Task, query_result: VSQueryResult): task_id = task.task_ref task_data = None try: with self.m_lock: task_data = self.m_task_data_cache.get(str(task_id)) if not task_data: self._log(f"Cache miss for {task_id}, fetching from cloud...") task_data = VSCloudApi.Instance().get_vas_task(str(task_id)) if task_data: with self.m_lock: self.m_task_data_cache[str(task_id)] = task_data if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']: self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.") self._remove_task(task, "bound task no longer valid") self.redis_client.zrem(self.m_tracker_key, task_id) return order_id = task_data.get('order_id') user_input = task_data.get('user_inputs', {}) book_res = task.instance.book(query_result, user_input) if book_res.success: self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.") grab_info = { "account": book_res.account, "session_id": book_res.session_id, "urn": book_res.urn, "slot_date": book_res.book_date, "slot_time": book_res.book_time, "timestamp": int(time.time()), "payment_link": book_res.payment_link } def _update_cloud_success(): try: VSCloudApi.Instance().update_vas_task(str(task_id), {"status": "grabbed", "grabbed_history": grab_info}) push_content = ( f"🎉 【预定成功通知】\n" f"━━━━━━━━━━━━━━━\n" f"订单编号: {order_id}\n" f"预约账号: {book_res.account}\n" f"预约日期: {book_res.book_date}\n" f"预约时间: {book_res.book_time}\n" f"预约编号: {book_res.urn}\n" f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n" f"━━━━━━━━━━━━━━━\n" ) VSCloudApi.Instance().push_weixin_text(push_content) except Exception as e: self._log(f"Failed to update success state to cloud: {e}") ThreadPool.getInstance().enqueue(_update_cloud_success) self.redis_client.zrem(self.m_tracker_key, task_id) self._remove_task(task, "booking success") else: self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.") except Exception as e: err_str = str(e) self._log(f"Exception during booking: {err_str}") rate_limited_indicators = [ "42901" in err_str, "Rate limited" in err_str ] if any(rate_limited_indicators): self._remove_task(task, "booking rate limited") if task_data and task_id is not None: task_meta = task_data.get('meta', {}) t_fails = task_meta.get('booking_failures', 0) + 1 task_meta['booking_failures'] = t_fails def _update_cloud_meta(): try: VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta}) except Exception as cloud_err: self._log(f"Failed to update task meta: {cloud_err}") ThreadPool.getInstance().enqueue(_update_cloud_meta) t_cd = self.task_backoff.calculate(t_fails) self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.") self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd}) def _creator_loop(self): self._log("Creator loop started.") spawn_interval = 10.0 while not self.m_stop_event.is_set(): time.sleep(2.0) now = time.time() for apt in self.m_cfg.appointment_types: r_key = apt.routing_key queue_cd_key = f"vs:queue:cooldown:{r_key}" if self.redis_client.exists(queue_cd_key): continue with self.m_lock: active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key) pending = self.m_pending_order_by_queue.get(r_key, 0) target = self.m_cfg.booker.target_instances if (active + pending) < target: last_spawn = self.m_last_spawn_times.get(r_key, 0.0) if now - last_spawn >= spawn_interval: self.m_last_spawn_times[r_key] = now self._spawn_worker(r_key) def _spawn_worker(self, target_routing_key: str): with self.m_lock: self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1 def _job(): success = False task_id = None is_rate_limited = False try: queue_name = f"auto.{target_routing_key}" task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name, test=False) if not task_data: return task_id = task_data['id'] with self.m_lock: self.m_task_data_cache[str(task_id)] = task_data self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + 5*60.0}) user_inputs = task_data.get('user_inputs', {}) plg_cfg = VSPlgConfig() plg_cfg.debug = self.m_cfg.debug plg_cfg.free_config = self.m_cfg.free_config plg_cfg.session_max_life = self.m_cfg.session_max_life plg_cfg.account.username = user_inputs.get("username", "") plg_cfg.account.password = user_inputs.get("password", "") if not plg_cfg.account.username: return acceptable_keys = [target_routing_key] if self.m_cfg.need_proxy: proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False) plg_cfg.proxy.id = proxy['id'] plg_cfg.proxy.ip = proxy['ip'] plg_cfg.proxy.port = proxy['port'] plg_cfg.proxy.proto = proxy['proto'] plg_cfg.proxy.username = proxy['username'] plg_cfg.proxy.password = proxy['password'] instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name) instance.set_log(self.m_logger) instance.set_config(plg_cfg) instance.create_session() with self.m_lock: self.m_tasks.append( Task( instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time(), task_ref=task_id, acceptable_routing_keys=acceptable_keys, source_queue=target_routing_key, book_allowed=True, next_remote_ping=time.time() + random.randint(180, 300) ) ) queue_fail_key = f"vs:queue:failures:{target_routing_key}" self.redis_client.delete(queue_fail_key) success = True self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})") except Exception as e: err_str = str(e) resource_not_found_indicators = [ "40401" in err_str, "Account not found" in err_str, "Proxy not found" in err_str ] if any(resource_not_found_indicators): return self._log(f"Order Booker spawn failed: {e}") rate_limited_indicators = [ "42901" in err_str, "Rate limited" in err_str ] if any(rate_limited_indicators): is_rate_limited = True queue_fail_key = f"vs:queue:failures:{target_routing_key}" queue_cd_key = f"vs:queue:cooldown:{target_routing_key}" q_fails = self.redis_client.incr(queue_fail_key) q_cd = self.queue_backoff.calculate(q_fails) self.redis_client.set(queue_cd_key, "1", ex=int(q_cd)) self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.") if task_id is not None: task_meta = task_data.get('meta') or {} t_fails = task_meta.get('spawn_failures', 0) + 1 task_meta['spawn_failures'] = t_fails try: VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta}) except Exception as cloud_err: self._log(f"Failed to update task meta: {cloud_err}") t_cd = self.account_backoff.calculate(t_fails) self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.") self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd}) finally: with self.m_lock: self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1) if not success and task_id is not None and not is_rate_limited: self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0}) self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.") with self.m_lock: self.m_task_data_cache.pop(str(task_id), None) ThreadPool.getInstance().enqueue(_job)