|
@@ -1,377 +1,404 @@
|
|
|
-import os
|
|
|
|
|
-import time
|
|
|
|
|
-import json
|
|
|
|
|
-import threading
|
|
|
|
|
-import random
|
|
|
|
|
-import traceback
|
|
|
|
|
-import redis
|
|
|
|
|
-from typing import List, Dict, Callable
|
|
|
|
|
-
|
|
|
|
|
-from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
|
|
|
|
|
-from vs_plg_factory import VSPlgFactory
|
|
|
|
|
-from toolkit.thread_pool import ThreadPool
|
|
|
|
|
-from toolkit.vs_cloud_api import VSCloudApi
|
|
|
|
|
-from toolkit.backoff import ExponentialBackoff
|
|
|
|
|
-
|
|
|
|
|
-class OrderBookerGCO:
|
|
|
|
|
- """
|
|
|
|
|
- 绑定模式 (订单自带账号):
|
|
|
|
|
- - 按城市队列维护热机配额。
|
|
|
|
|
- - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
|
|
|
|
|
- - 预订成功后,实例立即销毁。
|
|
|
|
|
- """
|
|
|
|
|
- def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
|
|
|
|
|
- self.m_cfg = cfg
|
|
|
|
|
- self.m_factory = VSPlgFactory()
|
|
|
|
|
- self.m_logger = logger
|
|
|
|
|
- self.m_tasks: List[Task] = []
|
|
|
|
|
- self.m_lock = threading.RLock()
|
|
|
|
|
- self.m_stop_event = threading.Event()
|
|
|
|
|
- self.redis_client = redis.Redis(**redis_conf)
|
|
|
|
|
- self.m_pending_order_by_queue: Dict[str, int] = {}
|
|
|
|
|
-
|
|
|
|
|
- self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
|
|
|
|
|
- self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
|
|
|
|
|
- self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
|
|
|
|
|
- self.m_last_spawn_time = 0.0
|
|
|
|
|
- self.heartbeat_ttl = 300
|
|
|
|
|
-
|
|
|
|
|
- def _log(self, message):
|
|
|
|
|
- if self.m_logger:
|
|
|
|
|
- self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
|
|
|
|
|
-
|
|
|
|
|
- def start(self):
|
|
|
|
|
- if not self.m_cfg.enable:
|
|
|
|
|
- return
|
|
|
|
|
- self._log("Starting Order Booker...")
|
|
|
|
|
- plugin_name = self.m_cfg.plugin_config.plugin_name
|
|
|
|
|
- class_name = "".join(part.title() for part in plugin_name.split('_'))
|
|
|
|
|
- plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
|
|
|
|
|
- self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
|
|
|
|
|
-
|
|
|
|
|
- threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
|
|
|
|
|
- threading.Thread(target=self._creator_loop, daemon=True).start()
|
|
|
|
|
- threading.Thread(target=self._maintain_loop, daemon=True).start()
|
|
|
|
|
-
|
|
|
|
|
- def stop(self):
|
|
|
|
|
- self._log("Stopping Booker...")
|
|
|
|
|
- self.m_stop_event.set()
|
|
|
|
|
-
|
|
|
|
|
- def _get_redis_key(self, routing_key: str) -> str:
|
|
|
|
|
- return f"vs:signal:{routing_key}"
|
|
|
|
|
-
|
|
|
|
|
- def _maintain_loop(self):
|
|
|
|
|
- self._log("Maintain loop started.")
|
|
|
|
|
- heartbeat_interval = 60
|
|
|
|
|
- while not self.m_stop_event.is_set():
|
|
|
|
|
- for _ in range(heartbeat_interval):
|
|
|
|
|
- if self.m_stop_event.is_set():
|
|
|
|
|
- return
|
|
|
|
|
- time.sleep(1.0)
|
|
|
|
|
-
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- tasks_to_check = list(self.m_tasks)
|
|
|
|
|
-
|
|
|
|
|
- if not tasks_to_check:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- healthy_tasks = []
|
|
|
|
|
- dead_tasks = []
|
|
|
|
|
- now = time.time()
|
|
|
|
|
-
|
|
|
|
|
- for t in tasks_to_check:
|
|
|
|
|
- if now >= t.next_remote_ping:
|
|
|
|
|
- try:
|
|
|
|
|
- t.instance.keep_alive()
|
|
|
|
|
- if t.instance.health_check():
|
|
|
|
|
- healthy_tasks.append(t)
|
|
|
|
|
- next_delay = random.randint(180, 300)
|
|
|
|
|
- t.next_remote_ping = now + next_delay
|
|
|
|
|
- self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
|
|
|
|
|
- else:
|
|
|
|
|
- dead_tasks.append(t)
|
|
|
|
|
- self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- dead_tasks.append(t)
|
|
|
|
|
- self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
|
|
|
|
|
- else:
|
|
|
|
|
- healthy_tasks.append(t)
|
|
|
|
|
-
|
|
|
|
|
- if healthy_tasks:
|
|
|
|
|
- try:
|
|
|
|
|
- pipeline = self.redis_client.pipeline()
|
|
|
|
|
- new_deadline = time.time() + self.heartbeat_ttl
|
|
|
|
|
- for t in healthy_tasks:
|
|
|
|
|
- if t.task_ref is not None:
|
|
|
|
|
- pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
|
|
|
|
|
- pipeline.execute()
|
|
|
|
|
- self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- self._log(f"Redis Heartbeat update failed: {e}")
|
|
|
|
|
-
|
|
|
|
|
- if dead_tasks:
|
|
|
|
|
- try:
|
|
|
|
|
- pipeline = self.redis_client.pipeline()
|
|
|
|
|
- for t in dead_tasks:
|
|
|
|
|
- if t.task_ref is not None:
|
|
|
|
|
- pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
|
|
|
|
|
- pipeline.execute()
|
|
|
|
|
- self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
|
|
|
|
|
-
|
|
|
|
|
- def _booking_trigger_loop(self):
|
|
|
|
|
- self._log("Trigger loop started.")
|
|
|
|
|
- while not self.m_stop_event.is_set():
|
|
|
|
|
- try:
|
|
|
|
|
- time.sleep(1.0)
|
|
|
|
|
- now = time.time()
|
|
|
|
|
- for apt_type in self.m_cfg.appointment_types:
|
|
|
|
|
- redis_key = self._get_redis_key(apt_type.routing_key)
|
|
|
|
|
- raw_data = self.redis_client.get(redis_key)
|
|
|
|
|
- if not raw_data:
|
|
|
|
|
- continue
|
|
|
|
|
- try:
|
|
|
|
|
- data = json.loads(raw_data)
|
|
|
|
|
- query_result = VSQueryResult.model_validate(data['query_result'])
|
|
|
|
|
- query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
|
|
|
|
|
- except Exception as parse_err:
|
|
|
|
|
- self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
|
|
|
|
|
- self.redis_client.delete(redis_key)
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- matching_tasks = []
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- for task in self.m_tasks:
|
|
|
|
|
- if now < task.next_run or not task.book_allowed:
|
|
|
|
|
- continue
|
|
|
|
|
- if apt_type.routing_key not in task.acceptable_routing_keys:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- task.next_run = now + self.m_cfg.booker.booking_cooldown
|
|
|
|
|
- matching_tasks.append(task)
|
|
|
|
|
-
|
|
|
|
|
- if matching_tasks:
|
|
|
|
|
- threads = []
|
|
|
|
|
- for task in matching_tasks:
|
|
|
|
|
- self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
|
|
|
|
|
- t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
|
|
|
|
|
- threads.append(t)
|
|
|
|
|
- t.start()
|
|
|
|
|
-
|
|
|
|
|
- for t in threads:
|
|
|
|
|
- t.join()
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- self._log(f"Trigger loop error: {e}")
|
|
|
|
|
- time.sleep(2)
|
|
|
|
|
-
|
|
|
|
|
- def _execute_book_job(self, task: Task, query_result: VSQueryResult):
|
|
|
|
|
- task_id = task.task_ref
|
|
|
|
|
- task_data = None
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
|
|
|
|
|
- if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
|
|
|
|
|
- self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- if task in self.m_tasks:
|
|
|
|
|
- self.m_tasks.remove(task)
|
|
|
|
|
- self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- order_id = task_data.get('order_id')
|
|
|
|
|
- user_input = task_data.get('user_inputs', {})
|
|
|
|
|
- book_res = task.instance.book(query_result, user_input)
|
|
|
|
|
-
|
|
|
|
|
- if book_res.success:
|
|
|
|
|
- self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
|
|
|
|
|
- grab_info = {
|
|
|
|
|
- "account": book_res.account,
|
|
|
|
|
- "session_id": book_res.session_id,
|
|
|
|
|
- "urn": book_res.urn,
|
|
|
|
|
- "slot_date": book_res.book_date,
|
|
|
|
|
- "slot_time": book_res.book_time,
|
|
|
|
|
- "timestamp": int(time.time()),
|
|
|
|
|
- "payment_link": book_res.payment_link
|
|
|
|
|
- }
|
|
|
|
|
- VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
|
|
|
|
|
- push_content = (
|
|
|
|
|
- f"🎉 【预定成功通知】\n"
|
|
|
|
|
- f"━━━━━━━━━━━━━━━\n"
|
|
|
|
|
- f"订单编号: {order_id}\n"
|
|
|
|
|
- f"预约账号: {book_res.account}\n"
|
|
|
|
|
- f"预约日期: {book_res.book_date}\n"
|
|
|
|
|
- f"预约时间: {book_res.book_time}\n"
|
|
|
|
|
- f"预约编号: {book_res.urn}\n"
|
|
|
|
|
- f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
|
|
|
|
|
- f"━━━━━━━━━━━━━━━\n"
|
|
|
|
|
- )
|
|
|
|
|
- VSCloudApi.Instance().push_weixin_text(push_content)
|
|
|
|
|
- self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
|
|
-
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- if task in self.m_tasks:
|
|
|
|
|
- self.m_tasks.remove(task)
|
|
|
|
|
- else:
|
|
|
|
|
- self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- err_str = str(e)
|
|
|
|
|
- self._log(f"Exception during booking: {err_str}")
|
|
|
|
|
- rate_limited_indicators = [
|
|
|
|
|
- "42901" in err_str,
|
|
|
|
|
- "Rate limited" in err_str
|
|
|
|
|
- ]
|
|
|
|
|
- if any(rate_limited_indicators):
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- if task in self.m_tasks:
|
|
|
|
|
- self.m_tasks.remove(task)
|
|
|
|
|
- if task_data and task_id is not None:
|
|
|
|
|
- task_meta = task_data.get('meta', {})
|
|
|
|
|
- t_fails = task_meta.get('booking_failures', 0) + 1
|
|
|
|
|
- task_meta['booking_failures'] = t_fails
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
|
|
|
|
|
- except Exception as cloud_err:
|
|
|
|
|
- self._log(f"Failed to update task meta: {cloud_err}")
|
|
|
|
|
-
|
|
|
|
|
- t_cd = self.task_backoff.calculate(t_fails)
|
|
|
|
|
- self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
|
|
-
|
|
|
|
|
- def _creator_loop(self):
|
|
|
|
|
- self._log("Creator loop started.")
|
|
|
|
|
- spawn_interval = 10.0
|
|
|
|
|
- while not self.m_stop_event.is_set():
|
|
|
|
|
- time.sleep(2.0)
|
|
|
|
|
- for apt in self.m_cfg.appointment_types:
|
|
|
|
|
- r_key = apt.routing_key
|
|
|
|
|
-
|
|
|
|
|
- queue_cd_key = f"vs:queue:cooldown:{r_key}"
|
|
|
|
|
- if self.redis_client.exists(queue_cd_key):
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
|
|
|
|
|
- pending = self.m_pending_order_by_queue.get(r_key, 0)
|
|
|
|
|
- target = self.m_cfg.booker.target_instances
|
|
|
|
|
-
|
|
|
|
|
- if (active + pending) < target:
|
|
|
|
|
- now = time.time()
|
|
|
|
|
- if now - self.m_last_spawn_time >= spawn_interval:
|
|
|
|
|
- self.m_last_spawn_time = now
|
|
|
|
|
- self._spawn_worker(r_key)
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- def _spawn_worker(self, target_routing_key: str):
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
|
|
|
|
|
-
|
|
|
|
|
- def _job():
|
|
|
|
|
- success = False
|
|
|
|
|
- task_id = None
|
|
|
|
|
- is_rate_limited = False
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- queue_name = f"auto.{target_routing_key}"
|
|
|
|
|
- task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
|
|
|
|
|
- if not task_data:
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- task_id = task_data['id']
|
|
|
|
|
-
|
|
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
|
|
|
|
|
- user_inputs = task_data.get('user_inputs', {})
|
|
|
|
|
-
|
|
|
|
|
- plg_cfg = VSPlgConfig()
|
|
|
|
|
- plg_cfg.debug = self.m_cfg.debug
|
|
|
|
|
- plg_cfg.free_config = self.m_cfg.free_config
|
|
|
|
|
- plg_cfg.session_max_life = self.m_cfg.session_max_life
|
|
|
|
|
- plg_cfg.account.username = user_inputs.get("username", "")
|
|
|
|
|
- plg_cfg.account.password = user_inputs.get("password", "")
|
|
|
|
|
- if not plg_cfg.account.username:
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- acceptable_keys = [target_routing_key]
|
|
|
|
|
- if self.m_cfg.need_proxy:
|
|
|
|
|
- proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
|
|
|
|
|
- plg_cfg.proxy.id = proxy['id']
|
|
|
|
|
- plg_cfg.proxy.ip = proxy['ip']
|
|
|
|
|
- plg_cfg.proxy.port = proxy['port']
|
|
|
|
|
- plg_cfg.proxy.proto = proxy['proto']
|
|
|
|
|
- plg_cfg.proxy.username = proxy['username']
|
|
|
|
|
- plg_cfg.proxy.password = proxy['password']
|
|
|
|
|
-
|
|
|
|
|
- instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
|
|
|
|
|
- instance.set_log(self.m_logger)
|
|
|
|
|
- instance.set_config(plg_cfg)
|
|
|
|
|
- instance.create_session()
|
|
|
|
|
-
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- self.m_tasks.append(
|
|
|
|
|
- Task(
|
|
|
|
|
- instance=instance,
|
|
|
|
|
- qw_cfg=self.m_cfg.query_wait,
|
|
|
|
|
- next_run=time.time(),
|
|
|
|
|
- task_ref=task_id,
|
|
|
|
|
- acceptable_routing_keys=acceptable_keys,
|
|
|
|
|
- source_queue=target_routing_key,
|
|
|
|
|
- book_allowed=True,
|
|
|
|
|
- next_remote_ping=time.time() + random.randint(180, 300)
|
|
|
|
|
- )
|
|
|
|
|
- )
|
|
|
|
|
- queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
|
|
- self.redis_client.delete(queue_fail_key)
|
|
|
|
|
- success = True
|
|
|
|
|
- self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- err_str = str(e)
|
|
|
|
|
- resource_not_found_indicators = [
|
|
|
|
|
- "40401" in err_str,
|
|
|
|
|
- "Account not found" in err_str,
|
|
|
|
|
- "Proxy not found" in err_str
|
|
|
|
|
- ]
|
|
|
|
|
- if any(resource_not_found_indicators):
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- self._log(f"Order Booker spawn failed: {e}")
|
|
|
|
|
-
|
|
|
|
|
- rate_limited_indicators = [
|
|
|
|
|
- "42901" in err_str,
|
|
|
|
|
- "Rate limited" in err_str
|
|
|
|
|
- ]
|
|
|
|
|
- if any(rate_limited_indicators):
|
|
|
|
|
- is_rate_limited = True
|
|
|
|
|
- queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
|
|
- queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
|
|
|
|
|
- q_fails = self.redis_client.incr(queue_fail_key)
|
|
|
|
|
- q_cd = self.queue_backoff.calculate(q_fails)
|
|
|
|
|
- self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
|
|
|
|
|
- self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
|
|
|
|
|
- if task_id is not None:
|
|
|
|
|
- task_meta = task_data.get('meta') or {}
|
|
|
|
|
- t_fails = task_meta.get('spawn_failures', 0) + 1
|
|
|
|
|
- task_meta['spawn_failures'] = t_fails
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
|
|
|
|
|
- except Exception as cloud_err:
|
|
|
|
|
- self._log(f"Failed to update task meta: {cloud_err}")
|
|
|
|
|
-
|
|
|
|
|
- t_cd = self.account_backoff.calculate(t_fails)
|
|
|
|
|
- self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
|
|
- finally:
|
|
|
|
|
- with self.m_lock:
|
|
|
|
|
- self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
|
|
|
|
|
-
|
|
|
|
|
- # 创建/登录失败,调用安全归还函数
|
|
|
|
|
- if not success and task_id is not None and not is_rate_limited:
|
|
|
|
|
- self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
|
|
|
|
|
- self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
|
|
|
|
|
|
|
+import os
|
|
|
|
|
+import time
|
|
|
|
|
+import json
|
|
|
|
|
+import threading
|
|
|
|
|
+import random
|
|
|
|
|
+import redis
|
|
|
|
|
+from typing import List, Dict, Callable
|
|
|
|
|
+
|
|
|
|
|
+from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
|
|
|
|
|
+from vs_plg_factory import VSPlgFactory
|
|
|
|
|
+from toolkit.thread_pool import ThreadPool
|
|
|
|
|
+from toolkit.vs_cloud_api import VSCloudApi
|
|
|
|
|
+from toolkit.backoff import ExponentialBackoff
|
|
|
|
|
+
|
|
|
|
|
+class OrderBookerGCO:
|
|
|
|
|
+ """
|
|
|
|
|
+ 绑定模式 (订单自带账号):
|
|
|
|
|
+ - 按城市队列维护热机配额。
|
|
|
|
|
+ - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
|
|
|
|
|
+ - 预订成功后,实例立即销毁。
|
|
|
|
|
+ """
|
|
|
|
|
+ def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
|
|
|
|
|
+ self.m_cfg = cfg
|
|
|
|
|
+ self.m_factory = VSPlgFactory()
|
|
|
|
|
+ self.m_logger = logger
|
|
|
|
|
+ self.m_tasks: List[Task] = []
|
|
|
|
|
+ self.m_lock = threading.RLock()
|
|
|
|
|
+ self.m_stop_event = threading.Event()
|
|
|
|
|
+ self.redis_client = redis.Redis(**redis_conf)
|
|
|
|
|
+ self.m_pending_order_by_queue: Dict[str, int] = {}
|
|
|
|
|
+
|
|
|
|
|
+ self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
|
|
|
|
|
+ self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
|
|
|
|
|
+ self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
|
|
|
|
|
+ self.m_last_spawn_time = 0.0
|
|
|
|
|
+ self.heartbeat_ttl = 300
|
|
|
|
|
+
|
|
|
|
|
+ def _log(self, message):
|
|
|
|
|
+ if self.m_logger:
|
|
|
|
|
+ self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
|
|
|
|
|
+
|
|
|
|
|
+ def start(self):
|
|
|
|
|
+ if not self.m_cfg.enable:
|
|
|
|
|
+ return
|
|
|
|
|
+ self._log("Starting Order Booker...")
|
|
|
|
|
+ plugin_name = self.m_cfg.plugin_config.plugin_name
|
|
|
|
|
+ class_name = "".join(part.title() for part in plugin_name.split('_'))
|
|
|
|
|
+ plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
|
|
|
|
|
+ self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
|
|
|
|
|
+
|
|
|
|
|
+ threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
|
|
|
|
|
+ threading.Thread(target=self._creator_loop, daemon=True).start()
|
|
|
|
|
+ threading.Thread(target=self._maintain_loop, daemon=True).start()
|
|
|
|
|
+
|
|
|
|
|
+ def stop(self):
|
|
|
|
|
+ self._log("Stopping Booker...")
|
|
|
|
|
+ self.m_stop_event.set()
|
|
|
|
|
+ self._cleanup_all_tasks("booker stop")
|
|
|
|
|
+
|
|
|
|
|
+ def _cleanup_task(self, task: Task, reason: str = ""):
|
|
|
|
|
+ try:
|
|
|
|
|
+ instance = getattr(task, 'instance', None)
|
|
|
|
|
+ if instance and hasattr(instance, 'cleanup'):
|
|
|
|
|
+ instance.cleanup()
|
|
|
|
|
+ self._log(f"🧹 Cleaned up instance for task={getattr(task, 'task_ref', None)}. Reason: {reason}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ self._log(f"Cleanup failed for task={getattr(task, 'task_ref', None)}. Reason: {reason}. Error: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True):
|
|
|
|
|
+ removed = False
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ if task in self.m_tasks:
|
|
|
|
|
+ self.m_tasks.remove(task)
|
|
|
|
|
+ removed = True
|
|
|
|
|
+ if cleanup and removed:
|
|
|
|
|
+ self._cleanup_task(task, reason)
|
|
|
|
|
+ return removed
|
|
|
|
|
+
|
|
|
|
|
+ def _cleanup_all_tasks(self, reason: str = ""):
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ tasks = list(self.m_tasks)
|
|
|
|
|
+ self.m_tasks.clear()
|
|
|
|
|
+ for task in tasks:
|
|
|
|
|
+ self._cleanup_task(task, reason)
|
|
|
|
|
+
|
|
|
|
|
+ def _get_redis_key(self, routing_key: str) -> str:
|
|
|
|
|
+ return f"vs:signal:{routing_key}"
|
|
|
|
|
+
|
|
|
|
|
+ def _maintain_loop(self):
|
|
|
|
|
+ self._log("Maintain loop started.")
|
|
|
|
|
+ heartbeat_interval = 60
|
|
|
|
|
+ while not self.m_stop_event.is_set():
|
|
|
|
|
+ for _ in range(heartbeat_interval):
|
|
|
|
|
+ if self.m_stop_event.is_set():
|
|
|
|
|
+ return
|
|
|
|
|
+ time.sleep(1.0)
|
|
|
|
|
+
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ tasks_to_check = list(self.m_tasks)
|
|
|
|
|
+
|
|
|
|
|
+ if not tasks_to_check:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ healthy_tasks = []
|
|
|
|
|
+ dead_tasks = []
|
|
|
|
|
+ now = time.time()
|
|
|
|
|
+
|
|
|
|
|
+ for t in tasks_to_check:
|
|
|
|
|
+ if now >= t.next_remote_ping:
|
|
|
|
|
+ try:
|
|
|
|
|
+ t.instance.keep_alive()
|
|
|
|
|
+ if t.instance.health_check():
|
|
|
|
|
+ healthy_tasks.append(t)
|
|
|
|
|
+ next_delay = random.randint(180, 300)
|
|
|
|
|
+ t.next_remote_ping = now + next_delay
|
|
|
|
|
+ self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
|
|
|
|
|
+ else:
|
|
|
|
|
+ dead_tasks.append(t)
|
|
|
|
|
+ self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ dead_tasks.append(t)
|
|
|
|
|
+ self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
|
|
|
|
|
+ else:
|
|
|
|
|
+ healthy_tasks.append(t)
|
|
|
|
|
+
|
|
|
|
|
+ if healthy_tasks:
|
|
|
|
|
+ try:
|
|
|
|
|
+ pipeline = self.redis_client.pipeline()
|
|
|
|
|
+ new_deadline = time.time() + self.heartbeat_ttl
|
|
|
|
|
+ for t in healthy_tasks:
|
|
|
|
|
+ if t.task_ref is not None:
|
|
|
|
|
+ pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
|
|
|
|
|
+ pipeline.execute()
|
|
|
|
|
+ self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ self._log(f"Redis Heartbeat update failed: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if dead_tasks:
|
|
|
|
|
+ try:
|
|
|
|
|
+ pipeline = self.redis_client.pipeline()
|
|
|
|
|
+ for t in dead_tasks:
|
|
|
|
|
+ if t.task_ref is not None:
|
|
|
|
|
+ pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
|
|
|
|
|
+ pipeline.execute()
|
|
|
|
|
+ self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ if dead_tasks:
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ current_tasks = list(self.m_tasks)
|
|
|
|
|
+ self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
|
|
|
|
|
+ for t in dead_tasks:
|
|
|
|
|
+ if t in current_tasks:
|
|
|
|
|
+ self._cleanup_task(t, "unhealthy or keep-alive failed")
|
|
|
|
|
+ else:
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
|
|
|
|
|
+
|
|
|
|
|
+ def _booking_trigger_loop(self):
|
|
|
|
|
+ self._log("Trigger loop started.")
|
|
|
|
|
+ while not self.m_stop_event.is_set():
|
|
|
|
|
+ try:
|
|
|
|
|
+ time.sleep(1.0)
|
|
|
|
|
+ now = time.time()
|
|
|
|
|
+ for apt_type in self.m_cfg.appointment_types:
|
|
|
|
|
+ redis_key = self._get_redis_key(apt_type.routing_key)
|
|
|
|
|
+ raw_data = self.redis_client.get(redis_key)
|
|
|
|
|
+ if not raw_data:
|
|
|
|
|
+ continue
|
|
|
|
|
+ try:
|
|
|
|
|
+ data = json.loads(raw_data)
|
|
|
|
|
+ query_result = VSQueryResult.model_validate(data['query_result'])
|
|
|
|
|
+ query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
|
|
|
|
|
+ except Exception as parse_err:
|
|
|
|
|
+ self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
|
|
|
|
|
+ self.redis_client.delete(redis_key)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ matching_tasks = []
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ for task in self.m_tasks:
|
|
|
|
|
+ if now < task.next_run or not task.book_allowed:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if apt_type.routing_key not in task.acceptable_routing_keys:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ task.next_run = now + self.m_cfg.booker.booking_cooldown
|
|
|
|
|
+ matching_tasks.append(task)
|
|
|
|
|
+
|
|
|
|
|
+ if matching_tasks:
|
|
|
|
|
+ threads = []
|
|
|
|
|
+ for task in matching_tasks:
|
|
|
|
|
+ self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
|
|
|
|
|
+ t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
|
|
|
|
|
+ threads.append(t)
|
|
|
|
|
+ t.start()
|
|
|
|
|
+
|
|
|
|
|
+ for t in threads:
|
|
|
|
|
+ t.join()
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ self._log(f"Trigger loop error: {e}")
|
|
|
|
|
+ time.sleep(2)
|
|
|
|
|
+
|
|
|
|
|
+ def _execute_book_job(self, task: Task, query_result: VSQueryResult):
|
|
|
|
|
+ task_id = task.task_ref
|
|
|
|
|
+ task_data = None
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ task_data = VSCloudApi.Instance().get_vas_task(task_id)
|
|
|
|
|
+ if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
|
|
|
|
|
+ self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
|
|
|
|
|
+ self._remove_task(task, "bound task no longer valid")
|
|
|
|
|
+ self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ order_id = task_data.get('order_id')
|
|
|
|
|
+ user_input = task_data.get('user_inputs', {})
|
|
|
|
|
+ book_res = task.instance.book(query_result, user_input)
|
|
|
|
|
+
|
|
|
|
|
+ if book_res.success:
|
|
|
|
|
+ self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
|
|
|
|
|
+ grab_info = {
|
|
|
|
|
+ "account": book_res.account,
|
|
|
|
|
+ "session_id": book_res.session_id,
|
|
|
|
|
+ "urn": book_res.urn,
|
|
|
|
|
+ "slot_date": book_res.book_date,
|
|
|
|
|
+ "slot_time": book_res.book_time,
|
|
|
|
|
+ "timestamp": int(time.time()),
|
|
|
|
|
+ "payment_link": book_res.payment_link
|
|
|
|
|
+ }
|
|
|
|
|
+ VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
|
|
|
|
|
+ push_content = (
|
|
|
|
|
+ f"🎉 【预定成功通知】\n"
|
|
|
|
|
+ f"━━━━━━━━━━━━━━━\n"
|
|
|
|
|
+ f"订单编号: {order_id}\n"
|
|
|
|
|
+ f"预约账号: {book_res.account}\n"
|
|
|
|
|
+ f"预约日期: {book_res.book_date}\n"
|
|
|
|
|
+ f"预约时间: {book_res.book_time}\n"
|
|
|
|
|
+ f"预约编号: {book_res.urn}\n"
|
|
|
|
|
+ f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
|
|
|
|
|
+ f"━━━━━━━━━━━━━━━\n"
|
|
|
|
|
+ )
|
|
|
|
|
+ VSCloudApi.Instance().push_weixin_text(push_content)
|
|
|
|
|
+ self.redis_client.zrem(self.m_tracker_key, task_id)
|
|
|
|
|
+ self._remove_task(task, "booking success")
|
|
|
|
|
+ else:
|
|
|
|
|
+ self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ err_str = str(e)
|
|
|
|
|
+ self._log(f"Exception during booking: {err_str}")
|
|
|
|
|
+ rate_limited_indicators = [
|
|
|
|
|
+ "42901" in err_str,
|
|
|
|
|
+ "Rate limited" in err_str
|
|
|
|
|
+ ]
|
|
|
|
|
+ if any(rate_limited_indicators):
|
|
|
|
|
+ self._remove_task(task, "booking rate limited")
|
|
|
|
|
+ if task_data and task_id is not None:
|
|
|
|
|
+ task_meta = task_data.get('meta', {})
|
|
|
|
|
+ t_fails = task_meta.get('booking_failures', 0) + 1
|
|
|
|
|
+ task_meta['booking_failures'] = t_fails
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
|
|
|
|
|
+ except Exception as cloud_err:
|
|
|
|
|
+ self._log(f"Failed to update task meta: {cloud_err}")
|
|
|
|
|
+
|
|
|
|
|
+ t_cd = self.task_backoff.calculate(t_fails)
|
|
|
|
|
+ self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
|
|
+ self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
|
|
+
|
|
|
|
|
+ def _creator_loop(self):
|
|
|
|
|
+ self._log("Creator loop started.")
|
|
|
|
|
+ spawn_interval = 10.0
|
|
|
|
|
+ while not self.m_stop_event.is_set():
|
|
|
|
|
+ time.sleep(2.0)
|
|
|
|
|
+ for apt in self.m_cfg.appointment_types:
|
|
|
|
|
+ r_key = apt.routing_key
|
|
|
|
|
+
|
|
|
|
|
+ queue_cd_key = f"vs:queue:cooldown:{r_key}"
|
|
|
|
|
+ if self.redis_client.exists(queue_cd_key):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
|
|
|
|
|
+ pending = self.m_pending_order_by_queue.get(r_key, 0)
|
|
|
|
|
+ target = self.m_cfg.booker.target_instances
|
|
|
|
|
+
|
|
|
|
|
+ if (active + pending) < target:
|
|
|
|
|
+ now = time.time()
|
|
|
|
|
+ if now - self.m_last_spawn_time >= spawn_interval:
|
|
|
|
|
+ self.m_last_spawn_time = now
|
|
|
|
|
+ self._spawn_worker(r_key)
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ def _spawn_worker(self, target_routing_key: str):
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
|
|
|
|
|
+
|
|
|
|
|
+ def _job():
|
|
|
|
|
+ success = False
|
|
|
|
|
+ task_id = None
|
|
|
|
|
+ is_rate_limited = False
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ queue_name = f"auto.{target_routing_key}"
|
|
|
|
|
+ task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
|
|
|
|
|
+ if not task_data:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ task_id = task_data['id']
|
|
|
|
|
+
|
|
|
|
|
+ self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
|
|
|
|
|
+ user_inputs = task_data.get('user_inputs', {})
|
|
|
|
|
+
|
|
|
|
|
+ plg_cfg = VSPlgConfig()
|
|
|
|
|
+ plg_cfg.debug = self.m_cfg.debug
|
|
|
|
|
+ plg_cfg.free_config = self.m_cfg.free_config
|
|
|
|
|
+ plg_cfg.session_max_life = self.m_cfg.session_max_life
|
|
|
|
|
+ plg_cfg.account.username = user_inputs.get("username", "")
|
|
|
|
|
+ plg_cfg.account.password = user_inputs.get("password", "")
|
|
|
|
|
+ if not plg_cfg.account.username:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ acceptable_keys = [target_routing_key]
|
|
|
|
|
+ if self.m_cfg.need_proxy:
|
|
|
|
|
+ proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
|
|
|
|
|
+ plg_cfg.proxy.id = proxy['id']
|
|
|
|
|
+ plg_cfg.proxy.ip = proxy['ip']
|
|
|
|
|
+ plg_cfg.proxy.port = proxy['port']
|
|
|
|
|
+ plg_cfg.proxy.proto = proxy['proto']
|
|
|
|
|
+ plg_cfg.proxy.username = proxy['username']
|
|
|
|
|
+ plg_cfg.proxy.password = proxy['password']
|
|
|
|
|
+
|
|
|
|
|
+ instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
|
|
|
|
|
+ instance.set_log(self.m_logger)
|
|
|
|
|
+ instance.set_config(plg_cfg)
|
|
|
|
|
+ instance.create_session()
|
|
|
|
|
+
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ self.m_tasks.append(
|
|
|
|
|
+ Task(
|
|
|
|
|
+ instance=instance,
|
|
|
|
|
+ qw_cfg=self.m_cfg.query_wait,
|
|
|
|
|
+ next_run=time.time(),
|
|
|
|
|
+ task_ref=task_id,
|
|
|
|
|
+ acceptable_routing_keys=acceptable_keys,
|
|
|
|
|
+ source_queue=target_routing_key,
|
|
|
|
|
+ book_allowed=True,
|
|
|
|
|
+ next_remote_ping=time.time() + random.randint(180, 300)
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
|
|
+ self.redis_client.delete(queue_fail_key)
|
|
|
|
|
+ success = True
|
|
|
|
|
+ self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ err_str = str(e)
|
|
|
|
|
+ resource_not_found_indicators = [
|
|
|
|
|
+ "40401" in err_str,
|
|
|
|
|
+ "Account not found" in err_str,
|
|
|
|
|
+ "Proxy not found" in err_str
|
|
|
|
|
+ ]
|
|
|
|
|
+ if any(resource_not_found_indicators):
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ self._log(f"Order Booker spawn failed: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ rate_limited_indicators = [
|
|
|
|
|
+ "42901" in err_str,
|
|
|
|
|
+ "Rate limited" in err_str
|
|
|
|
|
+ ]
|
|
|
|
|
+ if any(rate_limited_indicators):
|
|
|
|
|
+ is_rate_limited = True
|
|
|
|
|
+ queue_fail_key = f"vs:queue:failures:{target_routing_key}"
|
|
|
|
|
+ queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
|
|
|
|
|
+ q_fails = self.redis_client.incr(queue_fail_key)
|
|
|
|
|
+ q_cd = self.queue_backoff.calculate(q_fails)
|
|
|
|
|
+ self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
|
|
|
|
|
+ self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
|
|
|
|
|
+ if task_id is not None:
|
|
|
|
|
+ task_meta = task_data.get('meta') or {}
|
|
|
|
|
+ t_fails = task_meta.get('spawn_failures', 0) + 1
|
|
|
|
|
+ task_meta['spawn_failures'] = t_fails
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
|
|
|
|
|
+ except Exception as cloud_err:
|
|
|
|
|
+ self._log(f"Failed to update task meta: {cloud_err}")
|
|
|
|
|
+
|
|
|
|
|
+ t_cd = self.account_backoff.calculate(t_fails)
|
|
|
|
|
+ self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
|
|
|
|
|
+ self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
|
|
|
|
|
+ finally:
|
|
|
|
|
+ with self.m_lock:
|
|
|
|
|
+ self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
|
|
|
|
|
+
|
|
|
|
|
+ # 创建/登录失败,调用安全归还函数
|
|
|
|
|
+ if not success and task_id is not None and not is_rate_limited:
|
|
|
|
|
+ self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
|
|
|
|
|
+ self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
|
|
|
ThreadPool.getInstance().enqueue(_job)
|
|
ThreadPool.getInstance().enqueue(_job)
|