booker_order.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import traceback
  7. import redis
  8. from typing import List, Dict, Callable
  9. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  10. from vs_plg_factory import VSPlgFactory
  11. from toolkit.thread_pool import ThreadPool
  12. from toolkit.vs_cloud_api import VSCloudApi
  13. from toolkit.proxy_manager import ProxyManager
  14. from toolkit.backoff import ExponentialBackoff
  15. class OrderBookerGCO:
  16. """
  17. 绑定模式 (订单自带账号):
  18. - 按城市队列维护热机配额。
  19. - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
  20. - 预订成功后,实例立即销毁。
  21. """
  22. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  23. self.m_cfg = cfg
  24. self.m_factory = VSPlgFactory()
  25. self.m_logger = logger
  26. self.m_tasks: List[Task] = []
  27. self.m_lock = threading.RLock()
  28. self.m_stop_event = threading.Event()
  29. self.redis_client = redis.Redis(**redis_conf)
  30. self.m_pending_order_by_queue: Dict[str, int] = {}
  31. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  32. # 1. 队列级退避:失败起步5分钟,封顶1小时
  33. self.queue_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=1*60*60.0, factor=2.0)
  34. # 2. 账号级退避:失败起步30分钟,封顶3小时
  35. self.account_backoff = ExponentialBackoff(base_delay=30*60.0, max_delay=3*60*60.0, factor=2.0)
  36. def _log(self, message):
  37. if self.m_logger:
  38. self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
  39. def start(self):
  40. if not self.m_cfg.enable:
  41. return
  42. self._log("Starting Order Booker...")
  43. plugin_name = self.m_cfg.plugin_config.plugin_name
  44. class_name = "".join(part.title() for part in plugin_name.split('_'))
  45. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  46. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  47. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  48. threading.Thread(target=self._creator_loop, daemon=True).start()
  49. threading.Thread(target=self._maintain_loop, daemon=True).start()
  50. def stop(self):
  51. self._log("Stopping Booker...")
  52. self.m_stop_event.set()
  53. def _get_redis_key(self, routing_key: str) -> str:
  54. return f"vs:signal:{routing_key}"
  55. def _safe_return_task(self, task_id: int, reason: str = ""):
  56. if not task_id:
  57. return
  58. try:
  59. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  60. if not task_data:
  61. self.redis_client.zrem(self.m_tracker_key, task_id)
  62. return
  63. current_status = task_data.get('status', '')
  64. if current_status in['pending', 'grabbed', 'cancelled', 'success']:
  65. self.redis_client.zrem(self.m_tracker_key, task_id)
  66. return
  67. self._log(f"Returning task={task_id} to queue. Reason: {reason}")
  68. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  69. # 归还成功,核销防丢记录
  70. self.redis_client.zrem(self.m_tracker_key, task_id)
  71. except Exception as ex:
  72. self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
  73. def _maintain_loop(self):
  74. self._log("Maintain loop started.")
  75. rng = random.Random()
  76. while not self.m_stop_event.is_set():
  77. wait_seconds = rng.randint(180, 300)
  78. for _ in range(wait_seconds):
  79. if self.m_stop_event.is_set():
  80. return
  81. time.sleep(1.0)
  82. with self.m_lock:
  83. tasks_to_check = list(self.m_tasks)
  84. healthy_tasks = []
  85. dead_tasks = []
  86. for t in tasks_to_check:
  87. try:
  88. t.instance.keep_alive()
  89. if t.instance.health_check():
  90. healthy_tasks.append(t)
  91. else:
  92. dead_tasks.append(t)
  93. self._log(f"♻️ Instance for task={t.task_ref} unhealthy, marking for removal.")
  94. except Exception as e:
  95. dead_tasks.append(t)
  96. self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}, marking for removal.")
  97. with self.m_lock:
  98. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  99. # 实例死亡,调用安全归还函数
  100. for t in dead_tasks:
  101. if t.task_ref is not None:
  102. self._safe_return_task(t.task_ref, reason="Instance died during maintain_loop")
  103. def _booking_trigger_loop(self):
  104. self._log("Trigger loop started.")
  105. while not self.m_stop_event.is_set():
  106. try:
  107. time.sleep(1.0)
  108. now = time.time()
  109. for apt_type in self.m_cfg.appointment_types:
  110. redis_key = self._get_redis_key(apt_type.routing_key)
  111. raw_data = self.redis_client.get(redis_key)
  112. if not raw_data:
  113. continue
  114. try:
  115. data = json.loads(raw_data)
  116. query_result = VSQueryResult.model_validate(data['query_result'])
  117. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  118. except Exception as parse_err:
  119. self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
  120. self.redis_client.delete(redis_key)
  121. continue
  122. matching_tasks = []
  123. with self.m_lock:
  124. for task in self.m_tasks:
  125. if now < task.next_run or not task.book_allowed:
  126. continue
  127. if apt_type.routing_key not in task.acceptable_routing_keys:
  128. continue
  129. task.next_run = now + self.m_cfg.booker.booking_cooldown
  130. matching_tasks.append(task)
  131. if matching_tasks:
  132. threads = []
  133. for task in matching_tasks:
  134. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
  135. t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
  136. threads.append(t)
  137. t.start()
  138. for t in threads:
  139. t.join()
  140. except Exception as e:
  141. self._log(f"Trigger loop error: {e}")
  142. time.sleep(2)
  143. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  144. task_id = task.task_ref
  145. task_data = None
  146. booking_success = False
  147. try:
  148. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  149. if not task_data or task_data.get('status') in ['grabbed', 'cancelled']:
  150. self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
  151. with self.m_lock:
  152. if task in self.m_tasks:
  153. self.m_tasks.remove(task)
  154. return
  155. order_id = task_data.get('order_id')
  156. user_input = task_data.get('user_inputs', {})
  157. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
  158. book_res = task.instance.book(query_result, user_input)
  159. if book_res.success:
  160. booking_success = True
  161. self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
  162. grab_info = {
  163. "account": book_res.account,
  164. "session_id": book_res.session_id,
  165. "urn": book_res.urn,
  166. "slot_date": book_res.book_date,
  167. "slot_time": book_res.book_time,
  168. "timestamp": int(time.time()),
  169. "payment_link": book_res.payment_link
  170. }
  171. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  172. self.redis_client.zrem(self.m_tracker_key, task_id)
  173. with self.m_lock:
  174. if task in self.m_tasks:
  175. self.m_tasks.remove(task)
  176. else:
  177. self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
  178. except Exception as e:
  179. err_str = str(e)
  180. self._log(f"Exception during booking: {err_str}")
  181. rate_limited_indicators = [
  182. "42901" in err_str,
  183. "Rate limited" in err_str
  184. ]
  185. if any(rate_limited_indicators):
  186. with self.m_lock:
  187. if task in self.m_tasks:
  188. self.m_tasks.remove(task)
  189. if task_data and task_id is not None:
  190. task_meta = task_data.get('meta', {})
  191. t_fails = task_meta.get('booking_failures', 0) + 1
  192. task_meta['booking_failures'] = t_fails
  193. try:
  194. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  195. except Exception as cloud_err:
  196. self._log(f"Failed to update task meta: {cloud_err}")
  197. t_cd = self.task_backoff.calculate(t_fails)
  198. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  199. def delayed_return(tid, wait_sec, reason):
  200. self.m_stop_event.wait(wait_sec)
  201. self._safe_return_task(tid, reason=reason)
  202. t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
  203. t.start()
  204. def _creator_loop(self):
  205. self._log("Creator loop started.")
  206. while not self.m_stop_event.is_set():
  207. time.sleep(5.0)
  208. with self.m_lock:
  209. for apt in self.m_cfg.appointment_types:
  210. r_key = apt.routing_key
  211. queue_cd_key = f"vs:queue:cooldown:{r_key}"
  212. if self.redis_client.exists(queue_cd_key):
  213. continue # 仍在冷却中,跳过取单
  214. active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
  215. pending = self.m_pending_order_by_queue.get(r_key, 0)
  216. if (active + pending) < self.m_cfg.booker.target_instances:
  217. self._spawn_worker(r_key)
  218. time.sleep(0.5)
  219. def _spawn_worker(self, target_routing_key: str):
  220. with self.m_lock:
  221. self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
  222. def _job():
  223. success = False
  224. task_id = None
  225. try:
  226. queue_name = f"auto.{target_routing_key}"
  227. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  228. if not task_data:
  229. return
  230. task_id = task_data['id']
  231. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
  232. user_inputs = task_data.get('user_inputs', {})
  233. plg_cfg = VSPlgConfig()
  234. plg_cfg.debug = self.m_cfg.debug; plg_cfg.free_config = self.m_cfg.free_config; plg_cfg.session_max_life = self.m_cfg.session_max_life
  235. plg_cfg.account.username = user_inputs.get("username", "")
  236. plg_cfg.account.password = user_inputs.get("password", "")
  237. if not plg_cfg.account.username:
  238. return
  239. acceptable_keys = [target_routing_key]
  240. if self.m_cfg.need_proxy:
  241. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval)
  242. if not proxy:
  243. return
  244. plg_cfg.proxy.id = proxy['id']
  245. plg_cfg.proxy.ip = proxy['ip']
  246. plg_cfg.proxy.port = proxy['port']
  247. plg_cfg.proxy.scheme = proxy['scheme']
  248. plg_cfg.proxy.username = proxy['username']
  249. plg_cfg.proxy.password = proxy['password']
  250. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  251. instance.set_log(self.m_logger)
  252. instance.set_config(plg_cfg)
  253. instance.create_session()
  254. with self.m_lock:
  255. self.m_tasks.append(
  256. Task(
  257. instance=instance,
  258. qw_cfg=self.m_cfg.query_wait,
  259. next_run=time.time(),
  260. task_ref=task_id,
  261. acceptable_routing_keys=acceptable_keys,
  262. source_queue=target_routing_key,
  263. book_allowed=True
  264. )
  265. )
  266. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  267. self.redis_client.delete(queue_fail_key)
  268. success = True
  269. self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
  270. except Exception as e:
  271. err_str = str(e)
  272. order_not_found_indicators = [
  273. "40401" in err_str,
  274. "Account not found" in err_str
  275. ]
  276. if any(order_not_found_indicators):
  277. return
  278. self._log(f"Order Booker spawn failed: {e}")
  279. rate_limited_indicators = [
  280. "42901" in err_str,
  281. "Rate limited" in err_str
  282. ]
  283. if any(rate_limited_indicators):
  284. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  285. queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
  286. q_fails = self.redis_client.incr(queue_fail_key)
  287. q_cd = self.queue_backoff.calculate(q_fails)
  288. self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
  289. self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
  290. if task_id is not None:
  291. task_meta = task_data.get('meta', {})
  292. t_fails = task_meta.get('spawn_failures', 0) + 1
  293. task_meta['spawn_failures'] = t_fails
  294. # 立即持久化到云端
  295. try:
  296. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  297. except Exception as cloud_err:
  298. self._log(f"Failed to update task meta: {cloud_err}")
  299. # 使用工具计算该任务账号应该挂起的惩罚时间
  300. t_cd = self.account_backoff.calculate(t_fails)
  301. self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  302. def delayed_return(tid, wait_sec, reason):
  303. self.m_stop_event.wait(wait_sec)
  304. self._safe_return_task(tid, reason=reason)
  305. t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Spawn failed: rate limited (fails={t_fails})"), daemon=True)
  306. t.start()
  307. task_id = None # 置空防秒归还
  308. finally:
  309. with self.m_lock:
  310. self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
  311. # 创建/登录失败,调用安全归还函数
  312. if not success and task_id is not None:
  313. self._safe_return_task(task_id, reason="Instance spawn/login failed")
  314. ThreadPool.getInstance().enqueue(_job)