booker_order.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import traceback
  7. import redis
  8. from typing import List, Dict, Callable
  9. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  10. from vs_plg_factory import VSPlgFactory
  11. from toolkit.thread_pool import ThreadPool
  12. from toolkit.vs_cloud_api import VSCloudApi
  13. from toolkit.backoff import ExponentialBackoff
  14. class OrderBookerGCO:
  15. """
  16. 绑定模式 (订单自带账号):
  17. - 按城市队列维护热机配额。
  18. - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
  19. - 预订成功后,实例立即销毁。
  20. """
  21. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  22. self.m_cfg = cfg
  23. self.m_factory = VSPlgFactory()
  24. self.m_logger = logger
  25. self.m_tasks: List[Task] = []
  26. self.m_lock = threading.RLock()
  27. self.m_stop_event = threading.Event()
  28. self.redis_client = redis.Redis(**redis_conf)
  29. self.m_pending_order_by_queue: Dict[str, int] = {}
  30. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  31. self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
  32. self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  33. self.m_last_spawn_time = 0.0
  34. self.heartbeat_ttl = 300
  35. def _log(self, message):
  36. if self.m_logger:
  37. self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
  38. def start(self):
  39. if not self.m_cfg.enable:
  40. return
  41. self._log("Starting Order Booker...")
  42. plugin_name = self.m_cfg.plugin_config.plugin_name
  43. class_name = "".join(part.title() for part in plugin_name.split('_'))
  44. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  45. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  46. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  47. threading.Thread(target=self._creator_loop, daemon=True).start()
  48. threading.Thread(target=self._maintain_loop, daemon=True).start()
  49. def stop(self):
  50. self._log("Stopping Booker...")
  51. self.m_stop_event.set()
  52. def _get_redis_key(self, routing_key: str) -> str:
  53. return f"vs:signal:{routing_key}"
  54. def _maintain_loop(self):
  55. self._log("Maintain loop started.")
  56. heartbeat_interval = 60
  57. while not self.m_stop_event.is_set():
  58. for _ in range(heartbeat_interval):
  59. if self.m_stop_event.is_set():
  60. return
  61. time.sleep(1.0)
  62. with self.m_lock:
  63. tasks_to_check = list(self.m_tasks)
  64. if not tasks_to_check:
  65. continue
  66. healthy_tasks = []
  67. dead_tasks = []
  68. now = time.time()
  69. for t in tasks_to_check:
  70. if now >= t.next_remote_ping:
  71. try:
  72. t.instance.keep_alive()
  73. if t.instance.health_check():
  74. healthy_tasks.append(t)
  75. next_delay = random.randint(180, 300)
  76. t.next_remote_ping = now + next_delay
  77. self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
  78. else:
  79. dead_tasks.append(t)
  80. self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
  81. except Exception as e:
  82. dead_tasks.append(t)
  83. self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
  84. else:
  85. healthy_tasks.append(t)
  86. if healthy_tasks:
  87. try:
  88. pipeline = self.redis_client.pipeline()
  89. new_deadline = time.time() + self.heartbeat_ttl
  90. for t in healthy_tasks:
  91. if t.task_ref is not None:
  92. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
  93. pipeline.execute()
  94. self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
  95. except Exception as e:
  96. self._log(f"Redis Heartbeat update failed: {e}")
  97. if dead_tasks:
  98. try:
  99. pipeline = self.redis_client.pipeline()
  100. for t in dead_tasks:
  101. if t.task_ref is not None:
  102. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
  103. pipeline.execute()
  104. self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
  105. except Exception as e:
  106. pass
  107. with self.m_lock:
  108. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  109. def _booking_trigger_loop(self):
  110. self._log("Trigger loop started.")
  111. while not self.m_stop_event.is_set():
  112. try:
  113. time.sleep(1.0)
  114. now = time.time()
  115. for apt_type in self.m_cfg.appointment_types:
  116. redis_key = self._get_redis_key(apt_type.routing_key)
  117. raw_data = self.redis_client.get(redis_key)
  118. if not raw_data:
  119. continue
  120. try:
  121. data = json.loads(raw_data)
  122. query_result = VSQueryResult.model_validate(data['query_result'])
  123. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  124. except Exception as parse_err:
  125. self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
  126. self.redis_client.delete(redis_key)
  127. continue
  128. matching_tasks = []
  129. with self.m_lock:
  130. for task in self.m_tasks:
  131. if now < task.next_run or not task.book_allowed:
  132. continue
  133. if apt_type.routing_key not in task.acceptable_routing_keys:
  134. continue
  135. task.next_run = now + self.m_cfg.booker.booking_cooldown
  136. matching_tasks.append(task)
  137. if matching_tasks:
  138. threads = []
  139. for task in matching_tasks:
  140. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
  141. t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
  142. threads.append(t)
  143. t.start()
  144. for t in threads:
  145. t.join()
  146. except Exception as e:
  147. self._log(f"Trigger loop error: {e}")
  148. time.sleep(2)
  149. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  150. task_id = task.task_ref
  151. task_data = None
  152. try:
  153. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  154. if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
  155. self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
  156. with self.m_lock:
  157. if task in self.m_tasks:
  158. self.m_tasks.remove(task)
  159. self.redis_client.zrem(self.m_tracker_key, task_id)
  160. return
  161. order_id = task_data.get('order_id')
  162. user_input = task_data.get('user_inputs', {})
  163. book_res = task.instance.book(query_result, user_input)
  164. if book_res.success:
  165. self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
  166. grab_info = {
  167. "account": book_res.account,
  168. "session_id": book_res.session_id,
  169. "urn": book_res.urn,
  170. "slot_date": book_res.book_date,
  171. "slot_time": book_res.book_time,
  172. "timestamp": int(time.time()),
  173. "payment_link": book_res.payment_link
  174. }
  175. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  176. push_content = (
  177. f"🎉 【预定成功通知】\n"
  178. f"━━━━━━━━━━━━━━━\n"
  179. f"订单编号: {order_id}\n"
  180. f"预约账号: {book_res.account}\n"
  181. f"预约日期: {book_res.book_date}\n"
  182. f"预约时间: {book_res.book_time}\n"
  183. f"预约编号: {book_res.urn}\n"
  184. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  185. f"━━━━━━━━━━━━━━━\n"
  186. )
  187. VSCloudApi.Instance().push_weixin_text(push_content)
  188. self.redis_client.zrem(self.m_tracker_key, task_id)
  189. with self.m_lock:
  190. if task in self.m_tasks:
  191. self.m_tasks.remove(task)
  192. else:
  193. self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
  194. except Exception as e:
  195. err_str = str(e)
  196. self._log(f"Exception during booking: {err_str}")
  197. rate_limited_indicators = [
  198. "42901" in err_str,
  199. "Rate limited" in err_str
  200. ]
  201. if any(rate_limited_indicators):
  202. with self.m_lock:
  203. if task in self.m_tasks:
  204. self.m_tasks.remove(task)
  205. if task_data and task_id is not None:
  206. task_meta = task_data.get('meta', {})
  207. t_fails = task_meta.get('booking_failures', 0) + 1
  208. task_meta['booking_failures'] = t_fails
  209. try:
  210. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  211. except Exception as cloud_err:
  212. self._log(f"Failed to update task meta: {cloud_err}")
  213. t_cd = self.task_backoff.calculate(t_fails)
  214. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  215. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  216. def _creator_loop(self):
  217. self._log("Creator loop started.")
  218. spawn_interval = 10.0
  219. while not self.m_stop_event.is_set():
  220. time.sleep(2.0)
  221. for apt in self.m_cfg.appointment_types:
  222. r_key = apt.routing_key
  223. queue_cd_key = f"vs:queue:cooldown:{r_key}"
  224. if self.redis_client.exists(queue_cd_key):
  225. continue
  226. with self.m_lock:
  227. active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
  228. pending = self.m_pending_order_by_queue.get(r_key, 0)
  229. target = self.m_cfg.booker.target_instances
  230. if (active + pending) < target:
  231. now = time.time()
  232. if now - self.m_last_spawn_time >= spawn_interval:
  233. self.m_last_spawn_time = now
  234. self._spawn_worker(r_key)
  235. break
  236. def _spawn_worker(self, target_routing_key: str):
  237. with self.m_lock:
  238. self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
  239. def _job():
  240. success = False
  241. task_id = None
  242. is_rate_limited = False
  243. try:
  244. queue_name = f"auto.{target_routing_key}"
  245. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  246. if not task_data:
  247. return
  248. task_id = task_data['id']
  249. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
  250. user_inputs = task_data.get('user_inputs', {})
  251. plg_cfg = VSPlgConfig()
  252. plg_cfg.debug = self.m_cfg.debug
  253. plg_cfg.free_config = self.m_cfg.free_config
  254. plg_cfg.session_max_life = self.m_cfg.session_max_life
  255. plg_cfg.account.username = user_inputs.get("username", "")
  256. plg_cfg.account.password = user_inputs.get("password", "")
  257. if not plg_cfg.account.username:
  258. return
  259. acceptable_keys = [target_routing_key]
  260. if self.m_cfg.need_proxy:
  261. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  262. plg_cfg.proxy.id = proxy['id']
  263. plg_cfg.proxy.ip = proxy['ip']
  264. plg_cfg.proxy.port = proxy['port']
  265. plg_cfg.proxy.proto = proxy['proto']
  266. plg_cfg.proxy.username = proxy['username']
  267. plg_cfg.proxy.password = proxy['password']
  268. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  269. instance.set_log(self.m_logger)
  270. instance.set_config(plg_cfg)
  271. instance.create_session()
  272. with self.m_lock:
  273. self.m_tasks.append(
  274. Task(
  275. instance=instance,
  276. qw_cfg=self.m_cfg.query_wait,
  277. next_run=time.time(),
  278. task_ref=task_id,
  279. acceptable_routing_keys=acceptable_keys,
  280. source_queue=target_routing_key,
  281. book_allowed=True,
  282. next_remote_ping=time.time() + random.randint(180, 300)
  283. )
  284. )
  285. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  286. self.redis_client.delete(queue_fail_key)
  287. success = True
  288. self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
  289. except Exception as e:
  290. err_str = str(e)
  291. resource_not_found_indicators = [
  292. "40401" in err_str,
  293. "Account not found" in err_str,
  294. "Proxy not found" in err_str
  295. ]
  296. if any(resource_not_found_indicators):
  297. return
  298. self._log(f"Order Booker spawn failed: {e}")
  299. rate_limited_indicators = [
  300. "42901" in err_str,
  301. "Rate limited" in err_str
  302. ]
  303. if any(rate_limited_indicators):
  304. is_rate_limited = True
  305. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  306. queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
  307. q_fails = self.redis_client.incr(queue_fail_key)
  308. q_cd = self.queue_backoff.calculate(q_fails)
  309. self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
  310. self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
  311. if task_id is not None:
  312. task_meta = task_data.get('meta') or {}
  313. t_fails = task_meta.get('spawn_failures', 0) + 1
  314. task_meta['spawn_failures'] = t_fails
  315. try:
  316. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  317. except Exception as cloud_err:
  318. self._log(f"Failed to update task meta: {cloud_err}")
  319. t_cd = self.account_backoff.calculate(t_fails)
  320. self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  321. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  322. finally:
  323. with self.m_lock:
  324. self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
  325. # 创建/登录失败,调用安全归还函数
  326. if not success and task_id is not None and not is_rate_limited:
  327. self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
  328. self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
  329. ThreadPool.getInstance().enqueue(_job)