booker_order.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import redis
  7. from typing import List, Dict, Callable, Any, Optional
  8. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  9. from vs_plg_factory import VSPlgFactory
  10. from toolkit.thread_pool import ThreadPool
  11. from toolkit.vs_cloud_api import VSCloudApi
  12. from toolkit.backoff import ExponentialBackoff
  13. class OrderBookerGCO:
  14. """
  15. 绑定模式 (订单自带账号):
  16. - 按城市队列维护热机配额。
  17. - 绝对的 1 对 1 关系:一个实例绑定一个云端订单。
  18. - 预订成功后,实例立即销毁。
  19. """
  20. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  21. self.m_cfg = cfg
  22. self.m_factory = VSPlgFactory()
  23. self.m_logger = logger
  24. self.m_tasks: List[Task] = []
  25. self.m_lock = threading.RLock()
  26. self.m_stop_event = threading.Event()
  27. redis_common_kwargs = {
  28. **redis_conf,
  29. "socket_timeout": 5,
  30. "socket_connect_timeout": 5,
  31. # 会自动发送 PING
  32. "health_check_interval": 15,
  33. # TCP KeepAlive
  34. "socket_keepalive": True,
  35. "retry_on_timeout": True,
  36. "decode_responses": False,
  37. }
  38. self.redis_com = redis.Redis(**redis_common_kwargs)
  39. self.redis_sub = redis.Redis(**redis_common_kwargs)
  40. self.m_pending_order_by_queue: Dict[str, int] = {}
  41. self.m_last_spawn_times: Dict[str, float] = {}
  42. self.m_task_data_cache: Dict[str, dict] = {}
  43. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  44. self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
  45. self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  46. self.task_backoff = ExponentialBackoff(base_delay=10, max_delay=30*60.0, factor=2.0)
  47. self.heartbeat_ttl = 2*60.0
  48. def _log(self, message):
  49. if self.m_logger:
  50. self.m_logger(f'[ORDER-BOOKER] [{self.m_cfg.identifier}] {message}')
  51. def start(self):
  52. if not self.m_cfg.enable:
  53. return
  54. self._log("Starting Order Booker...")
  55. plugin_name = self.m_cfg.plugin_config.plugin_name
  56. class_name = "".join(part.title() for part in plugin_name.split('_'))
  57. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  58. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  59. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  60. threading.Thread(target=self._creator_loop, daemon=True).start()
  61. threading.Thread(target=self._maintain_loop, daemon=True).start()
  62. threading.Thread(target=self._cache_refresh_loop, daemon=True).start()
  63. def stop(self):
  64. self._log("Stopping Booker...")
  65. self.m_stop_event.set()
  66. self._cleanup_all_tasks("booker stop")
  67. def _cleanup_task(self, task: Task, reason: str = ""):
  68. try:
  69. instance = getattr(task, 'instance', None)
  70. if instance and hasattr(instance, 'cleanup'):
  71. instance.cleanup()
  72. self._log(f"🧹 Cleaned up instance for task={getattr(task, 'task_ref', None)}. Reason: {reason}")
  73. except Exception as e:
  74. self._log(f"Cleanup failed for task={getattr(task, 'task_ref', None)}. Reason: {reason}. Error: {e}")
  75. def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True):
  76. removed = False
  77. with self.m_lock:
  78. if task in self.m_tasks:
  79. self.m_tasks.remove(task)
  80. removed = True
  81. task_id = str(getattr(task, 'task_ref', ''))
  82. self.m_task_data_cache.pop(task_id, None)
  83. if cleanup and removed:
  84. self._cleanup_task(task, reason)
  85. return removed
  86. def _cleanup_all_tasks(self, reason: str = ""):
  87. with self.m_lock:
  88. tasks = list(self.m_tasks)
  89. self.m_tasks.clear()
  90. self.m_task_data_cache.clear()
  91. for task in tasks:
  92. self._cleanup_task(task, reason)
  93. def _get_redis_key(self, routing_key: str) -> str:
  94. return f"vs:signal:{routing_key}"
  95. def _maintain_loop(self):
  96. self._log("Maintain loop started.")
  97. heartbeat_interval = 60
  98. while not self.m_stop_event.is_set():
  99. for _ in range(heartbeat_interval):
  100. if self.m_stop_event.is_set():
  101. return
  102. time.sleep(1.0)
  103. with self.m_lock:
  104. tasks_to_check = list(self.m_tasks)
  105. if not tasks_to_check:
  106. continue
  107. healthy_tasks = []
  108. dead_tasks = []
  109. now = time.time()
  110. for t in tasks_to_check:
  111. if now >= t.next_remote_ping:
  112. try:
  113. t.instance.keep_alive()
  114. if t.instance.health_check():
  115. healthy_tasks.append(t)
  116. next_delay = random.randint(60, 180)
  117. t.next_remote_ping = now + next_delay
  118. self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
  119. else:
  120. dead_tasks.append(t)
  121. self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
  122. except Exception as e:
  123. dead_tasks.append(t)
  124. self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
  125. else:
  126. healthy_tasks.append(t)
  127. if healthy_tasks:
  128. try:
  129. pipeline = self.redis_com.pipeline()
  130. new_deadline = time.time() + self.heartbeat_ttl
  131. for t in healthy_tasks:
  132. if t.task_ref is not None:
  133. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
  134. pipeline.execute()
  135. self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
  136. except Exception as e:
  137. self._log(f"Redis Heartbeat update failed: {e}")
  138. if dead_tasks:
  139. try:
  140. pipeline = self.redis_com.pipeline()
  141. for t in dead_tasks:
  142. if t.task_ref is not None:
  143. pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
  144. pipeline.execute()
  145. self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
  146. except Exception as e:
  147. pass
  148. if dead_tasks:
  149. with self.m_lock:
  150. current_tasks = list(self.m_tasks)
  151. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  152. for t in dead_tasks:
  153. if t in current_tasks:
  154. self._cleanup_task(t, "unhealthy or keep-alive failed")
  155. else:
  156. with self.m_lock:
  157. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  158. def _cache_refresh_loop(self):
  159. self._log("Cache refresh loop started.")
  160. refresh_interval = 15*60
  161. while not self.m_stop_event.is_set():
  162. for _ in range(refresh_interval):
  163. if self.m_stop_event.is_set():
  164. return
  165. time.sleep(1.0)
  166. with self.m_lock:
  167. task_ids = list(self.m_task_data_cache.keys())
  168. if not task_ids:
  169. continue
  170. for tid in task_ids:
  171. if self.m_stop_event.is_set():
  172. break
  173. try:
  174. fresh_data = VSCloudApi.Instance().get_vas_task(tid)
  175. if fresh_data:
  176. with self.m_lock:
  177. if tid in self.m_task_data_cache:
  178. self.m_task_data_cache[tid] = fresh_data
  179. except Exception:
  180. pass
  181. time.sleep(0.5)
  182. def _booking_trigger_loop(self):
  183. self._log("Pub/Sub Trigger loop started.")
  184. channel_to_routing_key = {}
  185. for apt in self.m_cfg.appointment_types:
  186. channel = self._get_redis_key(apt.routing_key)
  187. channel_to_routing_key[channel] = apt.routing_key
  188. if not channel_to_routing_key:
  189. self._log("No appointment types configured. Exiting trigger loop.")
  190. return
  191. pubsub = None
  192. while not self.m_stop_event.is_set():
  193. try:
  194. if pubsub is None:
  195. pubsub = self.redis_sub.pubsub(ignore_subscribe_messages=False)
  196. channels_to_sub = list(channel_to_routing_key.keys())
  197. self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
  198. pubsub.subscribe(*channels_to_sub)
  199. message = pubsub.get_message(timeout=5.0)
  200. if not message:
  201. continue
  202. channel = message['channel']
  203. if isinstance(channel, bytes):
  204. channel = channel.decode('utf-8')
  205. if message['type'] == 'subscribe':
  206. active_subs = message['data']
  207. self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})")
  208. continue
  209. if message['type'] != 'message':
  210. continue
  211. raw_data = message['data']
  212. if isinstance(raw_data, bytes):
  213. raw_data = raw_data.decode('utf-8')
  214. routing_key = channel_to_routing_key.get(channel)
  215. if not routing_key:
  216. continue
  217. try:
  218. data = json.loads(raw_data)
  219. query_result = VSQueryResult.model_validate(data['query_result'])
  220. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  221. except Exception as parse_err:
  222. self._log(f"Data parsing error for channel {channel}: {parse_err}")
  223. continue
  224. now = time.time()
  225. matching_tasks = []
  226. with self.m_lock:
  227. for task in self.m_tasks:
  228. if now < task.next_run or not task.book_allowed:
  229. continue
  230. if routing_key not in task.acceptable_routing_keys:
  231. continue
  232. task.next_run = now + self.m_cfg.booker.booking_cooldown
  233. matching_tasks.append(task)
  234. if matching_tasks:
  235. for task in matching_tasks:
  236. self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}")
  237. t = threading.Thread(
  238. target=self._execute_book_job,
  239. args=(task, query_result),
  240. daemon=True
  241. )
  242. t.start()
  243. except Exception as e:
  244. self._log(f"Trigger loop pub/sub error: {e}")
  245. if pubsub:
  246. try:
  247. pubsub.close()
  248. except:
  249. pass
  250. pubsub = None
  251. time.sleep(2)
  252. if pubsub:
  253. pubsub.close()
  254. self._log("Pub/Sub connection closed.")
  255. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  256. task_id = task.task_ref
  257. task_data = None
  258. try:
  259. with self.m_lock:
  260. task_data = self.m_task_data_cache.get(str(task_id))
  261. if not task_data:
  262. self._log(f"Cache miss for {task_id}, fetching from cloud...")
  263. task_data = VSCloudApi.Instance().get_vas_task(str(task_id))
  264. if task_data:
  265. with self.m_lock:
  266. self.m_task_data_cache[str(task_id)] = task_data
  267. if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
  268. self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
  269. self._remove_task(task, "bound task no longer valid")
  270. self.redis_com.zrem(self.m_tracker_key, task_id)
  271. return
  272. order_id = task_data.get('order_id')
  273. user_input = task_data.get('user_inputs', {})
  274. book_res = task.instance.book(query_result, user_input)
  275. if book_res.success:
  276. self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
  277. grab_info = {
  278. "account": book_res.account,
  279. "session_id": book_res.session_id,
  280. "urn": book_res.urn,
  281. "slot_date": book_res.book_date,
  282. "slot_time": book_res.book_time,
  283. "timestamp": int(time.time()),
  284. "payment_link": book_res.payment_link
  285. }
  286. def _update_cloud_success():
  287. try:
  288. VSCloudApi.Instance().update_vas_task(str(task_id), {"status": "grabbed", "grabbed_history": grab_info})
  289. push_content = (
  290. f"🎉 【预定成功通知】\n"
  291. f"━━━━━━━━━━━━━━━\n"
  292. f"订单编号: {order_id}\n"
  293. f"预约账号: {book_res.account}\n"
  294. f"预约日期: {book_res.book_date}\n"
  295. f"预约时间: {book_res.book_time}\n"
  296. f"预约编号: {book_res.urn}\n"
  297. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  298. f"━━━━━━━━━━━━━━━\n"
  299. )
  300. VSCloudApi.Instance().push_weixin_text(push_content)
  301. except Exception as e:
  302. self._log(f"Failed to update success state to cloud: {e}")
  303. ThreadPool.getInstance().enqueue(_update_cloud_success)
  304. self.redis_com.zrem(self.m_tracker_key, task_id)
  305. self._remove_task(task, "booking success")
  306. else:
  307. self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
  308. except Exception as e:
  309. err_str = str(e)
  310. self._log(f"Exception during booking: {err_str}")
  311. rate_limited_indicators = [
  312. "42901" in err_str,
  313. "Rate limited" in err_str
  314. ]
  315. if any(rate_limited_indicators):
  316. self._remove_task(task, "booking rate limited")
  317. if task_data and task_id is not None:
  318. task_meta = task_data.get('meta', {})
  319. t_fails = task_meta.get('booking_failures', 0) + 1
  320. task_meta['booking_failures'] = t_fails
  321. def _update_cloud_meta():
  322. try:
  323. VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta})
  324. except Exception as cloud_err:
  325. self._log(f"Failed to update task meta: {cloud_err}")
  326. ThreadPool.getInstance().enqueue(_update_cloud_meta)
  327. t_cd = self.task_backoff.calculate(t_fails)
  328. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  329. self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  330. def _creator_loop(self):
  331. self._log("Creator loop started.")
  332. spawn_interval = 10.0
  333. while not self.m_stop_event.is_set():
  334. time.sleep(2.0)
  335. now = time.time()
  336. for apt in self.m_cfg.appointment_types:
  337. r_key = apt.routing_key
  338. queue_cd_key = f"vs:queue:cooldown:{r_key}"
  339. if self.redis_com.exists(queue_cd_key):
  340. continue
  341. with self.m_lock:
  342. active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
  343. pending = self.m_pending_order_by_queue.get(r_key, 0)
  344. target = self.m_cfg.booker.target_instances
  345. if (active + pending) < target:
  346. last_spawn = self.m_last_spawn_times.get(r_key, 0.0)
  347. if now - last_spawn >= spawn_interval:
  348. self.m_last_spawn_times[r_key] = now
  349. self._spawn_worker(r_key)
  350. def _spawn_worker(self, target_routing_key: str):
  351. with self.m_lock:
  352. self.m_pending_order_by_queue[target_routing_key] = self.m_pending_order_by_queue.get(target_routing_key, 0) + 1
  353. def _job():
  354. success = False
  355. task_id = None
  356. is_rate_limited = False
  357. try:
  358. queue_name = f"auto.{target_routing_key}"
  359. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  360. if not task_data:
  361. return
  362. task_id = task_data['id']
  363. with self.m_lock:
  364. self.m_task_data_cache[str(task_id)] = task_data
  365. self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + 5*60.0})
  366. user_inputs = task_data.get('user_inputs', {})
  367. plg_cfg = VSPlgConfig()
  368. plg_cfg.debug = self.m_cfg.debug
  369. plg_cfg.free_config = self.m_cfg.free_config
  370. plg_cfg.session_max_life = self.m_cfg.session_max_life
  371. plg_cfg.account.username = user_inputs.get("username", "")
  372. plg_cfg.account.password = user_inputs.get("password", "")
  373. if not plg_cfg.account.username:
  374. return
  375. acceptable_keys = [target_routing_key]
  376. if self.m_cfg.need_proxy:
  377. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  378. plg_cfg.proxy.id = proxy['id']
  379. plg_cfg.proxy.ip = proxy['ip']
  380. plg_cfg.proxy.port = proxy['port']
  381. plg_cfg.proxy.proto = proxy['proto']
  382. plg_cfg.proxy.username = proxy['username']
  383. plg_cfg.proxy.password = proxy['password']
  384. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  385. instance.set_log(self.m_logger)
  386. instance.set_config(plg_cfg)
  387. instance.create_session()
  388. with self.m_lock:
  389. self.m_tasks.append(
  390. Task(
  391. instance=instance,
  392. qw_cfg=self.m_cfg.query_wait,
  393. next_run=time.time(),
  394. task_ref=task_id,
  395. acceptable_routing_keys=acceptable_keys,
  396. source_queue=target_routing_key,
  397. book_allowed=True,
  398. next_remote_ping=time.time() + random.randint(180, 300)
  399. )
  400. )
  401. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  402. self.redis_com.delete(queue_fail_key)
  403. success = True
  404. self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
  405. except Exception as e:
  406. err_str = str(e)
  407. resource_not_found_indicators = [
  408. "40401" in err_str,
  409. "Account not found" in err_str,
  410. "Proxy not found" in err_str
  411. ]
  412. if any(resource_not_found_indicators):
  413. return
  414. self._log(f"Order Booker spawn failed: {e}")
  415. rate_limited_indicators = [
  416. "42901" in err_str,
  417. "Rate limited" in err_str
  418. ]
  419. if any(rate_limited_indicators):
  420. is_rate_limited = True
  421. queue_fail_key = f"vs:queue:failures:{target_routing_key}"
  422. queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
  423. q_fails = self.redis_com.incr(queue_fail_key)
  424. q_cd = self.queue_backoff.calculate(q_fails)
  425. self.redis_com.set(queue_cd_key, "1", ex=int(q_cd))
  426. self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
  427. if task_id is not None:
  428. task_meta = task_data.get('meta') or {}
  429. t_fails = task_meta.get('spawn_failures', 0) + 1
  430. task_meta['spawn_failures'] = t_fails
  431. try:
  432. VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta})
  433. except Exception as cloud_err:
  434. self._log(f"Failed to update task meta: {cloud_err}")
  435. t_cd = self.account_backoff.calculate(t_fails)
  436. self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  437. self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  438. finally:
  439. with self.m_lock:
  440. self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
  441. if not success and task_id is not None and not is_rate_limited:
  442. self.redis_com.zadd(self.m_tracker_key, {str(task_id): 0})
  443. self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
  444. with self.m_lock:
  445. self.m_task_data_cache.pop(str(task_id), None)
  446. ThreadPool.getInstance().enqueue(_job)