booker_order.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import redis
  7. from typing import List, Dict, Callable
  8. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  9. from vs_plg_factory import VSPlgFactory
  10. from toolkit.thread_pool import ThreadPool
  11. from toolkit.vs_cloud_api import VSCloudApi
  12. from toolkit.backoff import ExponentialBackoff
  13. class OrderBookerGCO:
  14. """
  15. 绑定模式 (订单自带账号):
  16. - 按城市队列维护热机配额。
  17. - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
  18. - 预订成功后,实例立即销毁。
  19. """
  20. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  21. self.m_cfg = cfg
  22. self.m_factory = VSPlgFactory()
  23. self.m_logger = logger
  24. self.m_tasks: List[Task] = []
  25. self.m_lock = threading.RLock()
  26. self.m_stop_event = threading.Event()
  27. self.redis_client = redis.Redis(**redis_conf)
  28. self.m_pending_order_by_queue: Dict[str, int] = {}
  29. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  30. self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
  31. self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  32. self.m_last_spawn_time = 0.0
  33. self.heartbeat_ttl = 300
  34. def _log(self, message):
  35. if self.m_logger:
  36. self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
  37. def start(self):
  38. if not self.m_cfg.enable:
  39. return
  40. self._log("Starting Order Booker...")
  41. plugin_name = self.m_cfg.plugin_config.plugin_name
  42. class_name = "".join(part.title() for part in plugin_name.split('_'))
  43. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  44. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  45. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  46. threading.Thread(target=self._creator_loop, daemon=True).start()
  47. threading.Thread(target=self._maintain_loop, daemon=True).start()
  48. def stop(self):
  49. self._log("Stopping Booker...")
  50. self.m_stop_event.set()
  51. self._cleanup_all_tasks("booker stop")
  52. def _cleanup_task(self, task: Task, reason: str = ""):
  53. try:
  54. instance = getattr(task, 'instance', None)
  55. if instance and hasattr(instance, 'cleanup'):
  56. instance.cleanup()
  57. self._log(f"🧹 Cleaned up instance for task={getattr(task, 'task_ref', None)}. Reason: {reason}")
  58. except Exception as e:
  59. self._log(f"Cleanup failed for task={getattr(task, 'task_ref', None)}. Reason: {reason}. Error: {e}")
  60. def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True):
  61. removed = False
  62. with self.m_lock:
  63. if task in self.m_tasks:
  64. self.m_tasks.remove(task)
  65. removed = True
  66. if cleanup and removed:
  67. self._cleanup_task(task, reason)
  68. return removed
  69. def _cleanup_all_tasks(self, reason: str = ""):
  70. with self.m_lock:
  71. tasks = list(self.m_tasks)
  72. self.m_tasks.clear()
  73. for task in tasks:
  74. self._cleanup_task(task, reason)
  75. def _get_redis_key(self, routing_key: str) -> str:
  76. return f"vs:signal:{routing_key}"
  77. def _maintain_loop(self):
  78. self._log("Maintain loop started.")
  79. heartbeat_interval = 60
  80. while not self.m_stop_event.is_set():
  81. for _ in range(heartbeat_interval):
  82. if self.m_stop_event.is_set():
  83. return
  84. time.sleep(1.0)
  85. with self.m_lock:
  86. tasks_to_check = list(self.m_tasks)
  87. if not tasks_to_check:
  88. continue
  89. healthy_tasks = []
  90. dead_tasks = []
  91. now = time.time()
  92. for t in tasks_to_check:
  93. if now >= t.next_remote_ping:
  94. try:
  95. t.instance.keep_alive()
  96. if t.instance.health_check():
  97. healthy_tasks.append(t)
  98. next_delay = random.randint(180, 300)
  99. t.next_remote_ping = now + next_delay
  100. self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
  101. else:
  102. dead_tasks.append(t)
  103. self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
  104. except Exception as e:
  105. dead_tasks.append(t)
  106. self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
  107. else:
  108. healthy_tasks.append(t)
  109. if healthy_tasks:
  110. try:
  111. pipeline = self.redis_client.pipeline()
  112. new_deadline = time.time() + self.heartbeat_ttl
  113. for t in healthy_tasks:
  114. if t.task_ref is not None:
  115. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
  116. pipeline.execute()
  117. self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
  118. except Exception as e:
  119. self._log(f"Redis Heartbeat update failed: {e}")
  120. if dead_tasks:
  121. try:
  122. pipeline = self.redis_client.pipeline()
  123. for t in dead_tasks:
  124. if t.task_ref is not None:
  125. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
  126. pipeline.execute()
  127. self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
  128. except Exception as e:
  129. pass
  130. if dead_tasks:
  131. with self.m_lock:
  132. current_tasks = list(self.m_tasks)
  133. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  134. for t in dead_tasks:
  135. if t in current_tasks:
  136. self._cleanup_task(t, "unhealthy or keep-alive failed")
  137. else:
  138. with self.m_lock:
  139. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  140. def _booking_trigger_loop(self):
  141. self._log("Trigger loop started.")
  142. while not self.m_stop_event.is_set():
  143. try:
  144. time.sleep(1.0)
  145. now = time.time()
  146. for apt_type in self.m_cfg.appointment_types:
  147. redis_key = self._get_redis_key(apt_type.routing_key)
  148. raw_data = self.redis_client.get(redis_key)
  149. if not raw_data:
  150. continue
  151. try:
  152. data = json.loads(raw_data)
  153. query_result = VSQueryResult.model_validate(data['query_result'])
  154. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  155. except Exception as parse_err:
  156. self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
  157. self.redis_client.delete(redis_key)
  158. continue
  159. matching_tasks = []
  160. with self.m_lock:
  161. for task in self.m_tasks:
  162. if now < task.next_run or not task.book_allowed:
  163. continue
  164. if apt_type.routing_key not in task.acceptable_routing_keys:
  165. continue
  166. task.next_run = now + self.m_cfg.booker.booking_cooldown
  167. matching_tasks.append(task)
  168. if matching_tasks:
  169. threads = []
  170. for task in matching_tasks:
  171. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
  172. t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
  173. threads.append(t)
  174. t.start()
  175. for t in threads:
  176. t.join()
  177. except Exception as e:
  178. self._log(f"Trigger loop error: {e}")
  179. time.sleep(2)
  180. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  181. task_id = task.task_ref
  182. task_data = None
  183. try:
  184. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  185. if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
  186. self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
  187. self._remove_task(task, "bound task no longer valid")
  188. self.redis_client.zrem(self.m_tracker_key, task_id)
  189. return
  190. order_id = task_data.get('order_id')
  191. user_input = task_data.get('user_inputs', {})
  192. book_res = task.instance.book(query_result, user_input)
  193. if book_res.success:
  194. self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
  195. grab_info = {
  196. "account": book_res.account,
  197. "session_id": book_res.session_id,
  198. "urn": book_res.urn,
  199. "slot_date": book_res.book_date,
  200. "slot_time": book_res.book_time,
  201. "timestamp": int(time.time()),
  202. "payment_link": book_res.payment_link
  203. }
  204. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  205. push_content = (
  206. f"🎉 【预定成功通知】\n"
  207. f"━━━━━━━━━━━━━━━\n"
  208. f"订单编号: {order_id}\n"
  209. f"预约账号: {book_res.account}\n"
  210. f"预约日期: {book_res.book_date}\n"
  211. f"预约时间: {book_res.book_time}\n"
  212. f"预约编号: {book_res.urn}\n"
  213. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  214. f"━━━━━━━━━━━━━━━\n"
  215. )
  216. VSCloudApi.Instance().push_weixin_text(push_content)
  217. self.redis_client.zrem(self.m_tracker_key, task_id)
  218. self._remove_task(task, "booking success")
  219. else:
  220. self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
  221. except Exception as e:
  222. err_str = str(e)
  223. self._log(f"Exception during booking: {err_str}")
  224. rate_limited_indicators = [
  225. "42901" in err_str,
  226. "Rate limited" in err_str
  227. ]
  228. if any(rate_limited_indicators):
  229. self._remove_task(task, "booking rate limited")
  230. if task_data and task_id is not None:
  231. task_meta = task_data.get('meta', {})
  232. t_fails = task_meta.get('booking_failures', 0) + 1
  233. task_meta['booking_failures'] = t_fails
  234. try:
  235. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  236. except Exception as cloud_err:
  237. self._log(f"Failed to update task meta: {cloud_err}")
  238. t_cd = self.task_backoff.calculate(t_fails)
  239. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  240. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  241. def _creator_loop(self):
  242. self._log("Creator loop started.")
  243. spawn_interval = 10.0
  244. while not self.m_stop_event.is_set():
  245. time.sleep(2.0)
  246. for apt in self.m_cfg.appointment_types:
  247. r_key = apt.routing_key
  248. queue_cd_key = f"vs:queue:cooldown:{r_key}"
  249. if self.redis_client.exists(queue_cd_key):
  250. continue
  251. with self.m_lock:
  252. active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
  253. pending = self.m_pending_order_by_queue.get(r_key, 0)
  254. target = self.m_cfg.booker.target_instances
  255. if (active + pending) < target:
  256. now = time.time()
  257. if now - self.m_last_spawn_time >= spawn_interval:
  258. self.m_last_spawn_time = now
  259. self._spawn_worker(r_key)
  260. break
  261. def _spawn_worker(self, target_routing_key: str):
  262. with self.m_lock:
  263. self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
  264. def _job():
  265. success = False
  266. task_id = None
  267. is_rate_limited = False
  268. try:
  269. queue_name = f"auto.{target_routing_key}"
  270. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  271. if not task_data:
  272. return
  273. task_id = task_data['id']
  274. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
  275. user_inputs = task_data.get('user_inputs', {})
  276. plg_cfg = VSPlgConfig()
  277. plg_cfg.debug = self.m_cfg.debug
  278. plg_cfg.free_config = self.m_cfg.free_config
  279. plg_cfg.session_max_life = self.m_cfg.session_max_life
  280. plg_cfg.account.username = user_inputs.get("username", "")
  281. plg_cfg.account.password = user_inputs.get("password", "")
  282. if not plg_cfg.account.username:
  283. return
  284. acceptable_keys = [target_routing_key]
  285. if self.m_cfg.need_proxy:
  286. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  287. plg_cfg.proxy.id = proxy['id']
  288. plg_cfg.proxy.ip = proxy['ip']
  289. plg_cfg.proxy.port = proxy['port']
  290. plg_cfg.proxy.proto = proxy['proto']
  291. plg_cfg.proxy.username = proxy['username']
  292. plg_cfg.proxy.password = proxy['password']
  293. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  294. instance.set_log(self.m_logger)
  295. instance.set_config(plg_cfg)
  296. instance.create_session()
  297. with self.m_lock:
  298. self.m_tasks.append(
  299. Task(
  300. instance=instance,
  301. qw_cfg=self.m_cfg.query_wait,
  302. next_run=time.time(),
  303. task_ref=task_id,
  304. acceptable_routing_keys=acceptable_keys,
  305. source_queue=target_routing_key,
  306. book_allowed=True,
  307. next_remote_ping=time.time() + random.randint(180, 300)
  308. )
  309. )
  310. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  311. self.redis_client.delete(queue_fail_key)
  312. success = True
  313. self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
  314. except Exception as e:
  315. err_str = str(e)
  316. resource_not_found_indicators = [
  317. "40401" in err_str,
  318. "Account not found" in err_str,
  319. "Proxy not found" in err_str
  320. ]
  321. if any(resource_not_found_indicators):
  322. return
  323. self._log(f"Order Booker spawn failed: {e}")
  324. rate_limited_indicators = [
  325. "42901" in err_str,
  326. "Rate limited" in err_str
  327. ]
  328. if any(rate_limited_indicators):
  329. is_rate_limited = True
  330. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  331. queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
  332. q_fails = self.redis_client.incr(queue_fail_key)
  333. q_cd = self.queue_backoff.calculate(q_fails)
  334. self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
  335. self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
  336. if task_id is not None:
  337. task_meta = task_data.get('meta') or {}
  338. t_fails = task_meta.get('spawn_failures', 0) + 1
  339. task_meta['spawn_failures'] = t_fails
  340. try:
  341. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  342. except Exception as cloud_err:
  343. self._log(f"Failed to update task meta: {cloud_err}")
  344. t_cd = self.account_backoff.calculate(t_fails)
  345. self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  346. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  347. finally:
  348. with self.m_lock:
  349. self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
  350. # 创建/登录失败,调用安全归还函数
  351. if not success and task_id is not None and not is_rate_limited:
  352. self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
  353. self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
  354. ThreadPool.getInstance().enqueue(_job)