| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- import os
- import time
- import json
- import threading
- import random
- import traceback
- import redis
- from typing import List, Dict, Callable
- from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
- from vs_plg_factory import VSPlgFactory
- from toolkit.thread_pool import ThreadPool
- from toolkit.vs_cloud_api import VSCloudApi
- from toolkit.proxy_manager import ProxyManager
- from toolkit.backoff import ExponentialBackoff
- class OrderBookerGCO:
- """
- 绑定模式 (订单自带账号):
- - 按城市队列维护热机配额。
- - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
- - 预订成功后,实例立即销毁。
- """
- def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
- self.m_cfg = cfg
- self.m_factory = VSPlgFactory()
- self.m_logger = logger
- self.m_tasks: List[Task] = []
- self.m_lock = threading.RLock()
- self.m_stop_event = threading.Event()
- self.redis_client = redis.Redis(**redis_conf)
- self.m_pending_order_by_queue: Dict[str, int] = {}
-
- self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
- # 1. 队列级退避:失败起步5分钟,封顶1小时
- self.queue_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=1*60*60.0, factor=2.0)
- # 2. 账号级退避:失败起步30分钟,封顶3小时
- self.account_backoff = ExponentialBackoff(base_delay=30*60.0, max_delay=3*60*60.0, factor=2.0)
- def _log(self, message):
- if self.m_logger:
- self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
- def start(self):
- if not self.m_cfg.enable:
- return
- self._log("Starting Order Booker...")
- plugin_name = self.m_cfg.plugin_config.plugin_name
- class_name = "".join(part.title() for part in plugin_name.split('_'))
- plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
- self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
- threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
- threading.Thread(target=self._creator_loop, daemon=True).start()
- threading.Thread(target=self._maintain_loop, daemon=True).start()
- def stop(self):
- self._log("Stopping Booker...")
- self.m_stop_event.set()
- def _get_redis_key(self, routing_key: str) -> str:
- return f"vs:signal:{routing_key}"
- def _safe_return_task(self, task_id: int, reason: str = ""):
- if not task_id:
- return
- try:
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
- if not task_data:
- self.redis_client.zrem(self.m_tracker_key, task_id)
- return
-
- current_status = task_data.get('status', '')
- if current_status in['pending', 'grabbed', 'cancelled', 'success']:
- self.redis_client.zrem(self.m_tracker_key, task_id)
- return
-
- self._log(f"Returning task={task_id} to queue. Reason: {reason}")
- VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-
- # 归还成功,核销防丢记录
- self.redis_client.zrem(self.m_tracker_key, task_id)
-
- except Exception as ex:
- self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
-
- def _maintain_loop(self):
- self._log("Maintain loop started.")
- rng = random.Random()
- while not self.m_stop_event.is_set():
- wait_seconds = rng.randint(180, 300)
- for _ in range(wait_seconds):
- if self.m_stop_event.is_set():
- return
- time.sleep(1.0)
-
- with self.m_lock:
- tasks_to_check = list(self.m_tasks)
-
- healthy_tasks = []
- dead_tasks = []
-
- for t in tasks_to_check:
- try:
- t.instance.keep_alive()
- if t.instance.health_check():
- healthy_tasks.append(t)
- else:
- dead_tasks.append(t)
- self._log(f"♻️ Instance for task={t.task_ref} unhealthy, marking for removal.")
- except Exception as e:
- dead_tasks.append(t)
- self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}, marking for removal.")
-
- with self.m_lock:
- self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
- # 实例死亡,调用安全归还函数
- for t in dead_tasks:
- if t.task_ref is not None:
- self._safe_return_task(t.task_ref, reason="Instance died during maintain_loop")
- def _booking_trigger_loop(self):
- self._log("Trigger loop started.")
- while not self.m_stop_event.is_set():
- try:
- time.sleep(1.0)
- now = time.time()
- for apt_type in self.m_cfg.appointment_types:
- redis_key = self._get_redis_key(apt_type.routing_key)
- raw_data = self.redis_client.get(redis_key)
- if not raw_data:
- continue
- try:
- data = json.loads(raw_data)
- query_result = VSQueryResult.model_validate(data['query_result'])
- query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
- except Exception as parse_err:
- self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
- self.redis_client.delete(redis_key)
- continue
-
- matching_tasks = []
- with self.m_lock:
- for task in self.m_tasks:
- if now < task.next_run or not task.book_allowed:
- continue
- if apt_type.routing_key not in task.acceptable_routing_keys:
- continue
-
- task.next_run = now + self.m_cfg.booker.booking_cooldown
- matching_tasks.append(task)
-
- if matching_tasks:
- threads = []
- for task in matching_tasks:
- self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
- t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
- threads.append(t)
- t.start()
-
- for t in threads:
- t.join()
-
- except Exception as e:
- self._log(f"Trigger loop error: {e}")
- time.sleep(2)
- def _execute_book_job(self, task: Task, query_result: VSQueryResult):
- task_id = task.task_ref
- task_data = None
- booking_success = False
- try:
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
- if not task_data or task_data.get('status') in ['grabbed', 'cancelled']:
- self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
- with self.m_lock:
- if task in self.m_tasks:
- self.m_tasks.remove(task)
- return
-
- order_id = task_data.get('order_id')
- user_input = task_data.get('user_inputs', {})
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
- book_res = task.instance.book(query_result, user_input)
- if book_res.success:
- booking_success = True
- self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
- grab_info = {
- "account": book_res.account,
- "session_id": book_res.session_id,
- "urn": book_res.urn,
- "slot_date": book_res.book_date,
- "slot_time": book_res.book_time,
- "timestamp": int(time.time()),
- "payment_link": book_res.payment_link
- }
- VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
- self.redis_client.zrem(self.m_tracker_key, task_id)
-
- with self.m_lock:
- if task in self.m_tasks:
- self.m_tasks.remove(task)
- else:
- self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
- except Exception as e:
- err_str = str(e)
- self._log(f"Exception during booking: {err_str}")
- rate_limited_indicators = [
- "42901" in err_str,
- "Rate limited" in err_str
- ]
- if any(rate_limited_indicators):
- with self.m_lock:
- if task in self.m_tasks:
- self.m_tasks.remove(task)
- if task_data and task_id is not None:
- task_meta = task_data.get('meta', {})
- t_fails = task_meta.get('booking_failures', 0) + 1
- task_meta['booking_failures'] = t_fails
-
- try:
- VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
- except Exception as cloud_err:
- self._log(f"Failed to update task meta: {cloud_err}")
-
- t_cd = self.task_backoff.calculate(t_fails)
- self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
-
- def delayed_return(tid, wait_sec, reason):
- self.m_stop_event.wait(wait_sec)
- self._safe_return_task(tid, reason=reason)
-
- t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
- t.start()
- def _creator_loop(self):
- self._log("Creator loop started.")
- while not self.m_stop_event.is_set():
- time.sleep(5.0)
- with self.m_lock:
- for apt in self.m_cfg.appointment_types:
- r_key = apt.routing_key
-
- queue_cd_key = f"vs:queue:cooldown:{r_key}"
- if self.redis_client.exists(queue_cd_key):
- continue # 仍在冷却中,跳过取单
-
- active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
- pending = self.m_pending_order_by_queue.get(r_key, 0)
-
- if (active + pending) < self.m_cfg.booker.target_instances:
- self._spawn_worker(r_key)
- time.sleep(0.5)
- def _spawn_worker(self, target_routing_key: str):
- with self.m_lock:
- self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
-
- def _job():
- success = False
- task_id = None
- try:
- queue_name = f"auto.{target_routing_key}"
- task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
- if not task_data:
- return
-
- task_id = task_data['id']
-
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
- user_inputs = task_data.get('user_inputs', {})
-
- plg_cfg = VSPlgConfig()
- plg_cfg.debug = self.m_cfg.debug; plg_cfg.free_config = self.m_cfg.free_config; plg_cfg.session_max_life = self.m_cfg.session_max_life
-
- plg_cfg.account.username = user_inputs.get("username", "")
- plg_cfg.account.password = user_inputs.get("password", "")
- if not plg_cfg.account.username:
- return
-
- acceptable_keys = [target_routing_key]
- if self.m_cfg.need_proxy:
- proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval)
- if not proxy:
- return
- plg_cfg.proxy.id = proxy['id']
- plg_cfg.proxy.ip = proxy['ip']
- plg_cfg.proxy.port = proxy['port']
- plg_cfg.proxy.scheme = proxy['scheme']
- plg_cfg.proxy.username = proxy['username']
- plg_cfg.proxy.password = proxy['password']
- instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
- instance.set_log(self.m_logger)
- instance.set_config(plg_cfg)
- instance.create_session()
-
- with self.m_lock:
- self.m_tasks.append(
- Task(
- instance=instance,
- qw_cfg=self.m_cfg.query_wait,
- next_run=time.time(),
- task_ref=task_id,
- acceptable_routing_keys=acceptable_keys,
- source_queue=target_routing_key,
- book_allowed=True
- )
- )
- queue_fail_key = f"vs:queue:failures:{target_routing_key}"
- self.redis_client.delete(queue_fail_key)
- success = True
- self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
- except Exception as e:
- err_str = str(e)
- order_not_found_indicators = [
- "40401" in err_str,
- "Account not found" in err_str
- ]
- if any(order_not_found_indicators):
- return
-
- self._log(f"Order Booker spawn failed: {e}")
-
- rate_limited_indicators = [
- "42901" in err_str,
- "Rate limited" in err_str
- ]
- if any(rate_limited_indicators):
- queue_fail_key = f"vs:queue:failures:{target_routing_key}"
- queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
- q_fails = self.redis_client.incr(queue_fail_key)
- q_cd = self.queue_backoff.calculate(q_fails)
- self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
- self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
- if task_id is not None:
- task_meta = task_data.get('meta', {})
- t_fails = task_meta.get('spawn_failures', 0) + 1
- task_meta['spawn_failures'] = t_fails
-
- # 立即持久化到云端
- try:
- VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
- except Exception as cloud_err:
- self._log(f"Failed to update task meta: {cloud_err}")
-
- # 使用工具计算该任务账号应该挂起的惩罚时间
- t_cd = self.account_backoff.calculate(t_fails)
- self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
-
- def delayed_return(tid, wait_sec, reason):
- self.m_stop_event.wait(wait_sec)
- self._safe_return_task(tid, reason=reason)
-
- t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Spawn failed: rate limited (fails={t_fails})"), daemon=True)
- t.start()
-
- task_id = None # 置空防秒归还
-
- finally:
- with self.m_lock:
- self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
-
- # 创建/登录失败,调用安全归还函数
- if not success and task_id is not None:
- self._safe_return_task(task_id, reason="Instance spawn/login failed")
-
- ThreadPool.getInstance().enqueue(_job)
|