booker_builtin.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import redis
  7. from typing import List, Dict, Callable
  8. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  9. from vs_plg_factory import VSPlgFactory
  10. from toolkit.thread_pool import ThreadPool
  11. from toolkit.vs_cloud_api import VSCloudApi
  12. from toolkit.backoff import ExponentialBackoff
  13. class BuiltinBookerGCO:
  14. """
  15. 非绑定模式 (公共内置账号池):
  16. - 只维护全局 target_instances 数量的实例。
  17. - 所有实例热机等待,发现信号后临时去云端 Pop 订单。
  18. """
  19. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  20. self.m_cfg = cfg
  21. self.m_factory = VSPlgFactory()
  22. self.m_logger = logger
  23. self.m_tasks: List[Task] = []
  24. self.m_lock = threading.RLock()
  25. self.m_stop_event = threading.Event()
  26. self.redis_client = redis.Redis(**redis_conf)
  27. self.m_pending_builtin = 0
  28. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  29. self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=10*60.0, factor=2.0)
  30. self.task_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  31. self.m_last_spawn_time = 0.0
  32. self.heartbeat_ttl = 300
  33. def _log(self, message):
  34. if self.m_logger:
  35. self.m_logger(f'[BUILTIN-BOOKER] [{self.m_cfg.identifier}] {message}')
  36. def start(self):
  37. if not self.m_cfg.enable:
  38. return
  39. self._log("Starting Built-in Booker...")
  40. plugin_name = self.m_cfg.plugin_config.plugin_name
  41. class_name = "".join(part.title() for part in plugin_name.split('_'))
  42. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  43. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  44. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  45. threading.Thread(target=self._creator_loop, daemon=True).start()
  46. threading.Thread(target=self._maintain_loop, daemon=True).start()
  47. def stop(self):
  48. self._log("Stopping Booker...")
  49. self.m_stop_event.set()
  50. self._cleanup_all_tasks("booker stop")
  51. def _cleanup_task(self, task: Task, reason: str = ""):
  52. try:
  53. instance = getattr(task, 'instance', None)
  54. if instance and hasattr(instance, 'cleanup'):
  55. instance.cleanup()
  56. self._log(f"🧹 Cleaned up built-in instance. Reason: {reason}")
  57. except Exception as e:
  58. self._log(f"Cleanup failed for built-in instance. Reason: {reason}. Error: {e}")
  59. def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True):
  60. removed = False
  61. with self.m_lock:
  62. if task in self.m_tasks:
  63. self.m_tasks.remove(task)
  64. removed = True
  65. if cleanup and removed:
  66. self._cleanup_task(task, reason)
  67. return removed
  68. def _cleanup_all_tasks(self, reason: str = ""):
  69. with self.m_lock:
  70. tasks = list(self.m_tasks)
  71. self.m_tasks.clear()
  72. for task in tasks:
  73. self._cleanup_task(task, reason)
  74. def _get_redis_key(self, routing_key: str) -> str:
  75. return f"vs:signal:{routing_key}"
  76. def _maintain_loop(self):
  77. self._log("Maintain loop started.")
  78. while not self.m_stop_event.is_set():
  79. time.sleep(1.0)
  80. now = time.time()
  81. with self.m_lock:
  82. tasks_to_check = list(self.m_tasks)
  83. if not tasks_to_check:
  84. continue
  85. healthy_tasks = []
  86. dead_tasks = []
  87. for t in tasks_to_check:
  88. if now >= t.next_remote_ping:
  89. try:
  90. t.instance.keep_alive()
  91. if t.instance.health_check():
  92. healthy_tasks.append(t)
  93. next_delay = random.randint(180, 300)
  94. t.next_remote_ping = now + next_delay
  95. else:
  96. dead_tasks.append(t)
  97. self._log(f"♻️ Instance unhealthy. Will be removed.")
  98. except Exception as e:
  99. dead_tasks.append(t)
  100. self._log(f"Instance keep-alive failed: {e}")
  101. else:
  102. healthy_tasks.append(t)
  103. if dead_tasks:
  104. with self.m_lock:
  105. current_tasks = list(self.m_tasks)
  106. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  107. for t in dead_tasks:
  108. if t in current_tasks:
  109. self._cleanup_task(t, "unhealthy or keep-alive failed")
  110. else:
  111. with self.m_lock:
  112. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  113. def _booking_trigger_loop(self):
  114. self._log("Pub/Sub Trigger loop started.")
  115. channel_to_routing_key = {}
  116. for apt in self.m_cfg.appointment_types:
  117. channel = self._get_redis_key(apt.routing_key)
  118. channel_to_routing_key[channel] = apt.routing_key
  119. if not channel_to_routing_key:
  120. self._log("No appointment types configured. Exiting trigger loop.")
  121. return
  122. pubsub = None
  123. while not self.m_stop_event.is_set():
  124. try:
  125. if pubsub is None:
  126. pubsub = self.redis_client.pubsub(ignore_subscribe_messages=False)
  127. channels_to_sub = list(channel_to_routing_key.keys())
  128. self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
  129. pubsub.subscribe(*channels_to_sub)
  130. message = pubsub.get_message(timeout=5.0)
  131. if not message:
  132. continue
  133. channel = message['channel']
  134. if isinstance(channel, bytes):
  135. channel = channel.decode('utf-8')
  136. if message['type'] == 'subscribe':
  137. active_subs = message['data']
  138. self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})")
  139. continue
  140. if message['type'] != 'message':
  141. continue
  142. raw_data = message['data']
  143. if isinstance(raw_data, bytes):
  144. raw_data = raw_data.decode('utf-8')
  145. routing_key = channel_to_routing_key.get(channel)
  146. if not routing_key:
  147. continue
  148. try:
  149. data = json.loads(raw_data)
  150. query_result = VSQueryResult.model_validate(data['query_result'])
  151. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  152. except Exception as parse_err:
  153. self._log(f"Data parsing error for channel {channel}: {parse_err}")
  154. continue
  155. now = time.time()
  156. matching_tasks = []
  157. with self.m_lock:
  158. for task in self.m_tasks:
  159. if now < task.next_run or not task.book_allowed:
  160. continue
  161. if routing_key not in task.acceptable_routing_keys:
  162. continue
  163. task.next_run = now + self.m_cfg.booker.booking_cooldown
  164. matching_tasks.append(task)
  165. if matching_tasks:
  166. for task in matching_tasks:
  167. self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}")
  168. t = threading.Thread(
  169. target=self._execute_book_job,
  170. args=(task, query_result),
  171. daemon=True
  172. )
  173. t.start()
  174. except Exception as e:
  175. self._log(f"Trigger loop pub/sub error: {e}")
  176. if pubsub:
  177. try:
  178. pubsub.close()
  179. except:
  180. pass
  181. pubsub = None
  182. time.sleep(2)
  183. if pubsub:
  184. pubsub.close()
  185. self._log("Pub/Sub connection closed.")
  186. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  187. queue_name = f"auto.{query_result.apt_type.routing_key}"
  188. task_id = None
  189. task_data = None
  190. booking_success = False
  191. is_rate_limited = False
  192. try:
  193. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name, test=False)
  194. if not task_data:
  195. return
  196. task_id = task_data['id']
  197. order_id = task_data.get('order_id')
  198. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
  199. user_input = task_data.get('user_inputs', {})
  200. book_res = task.instance.book(query_result, user_input)
  201. if book_res.success:
  202. booking_success = True
  203. self._log(f"✅ BOOK SUCCESS! Order: {order_id}")
  204. grab_info = {
  205. "account": book_res.account,
  206. "session_id": book_res.session_id,
  207. "urn": book_res.urn,
  208. "slot_date": book_res.book_date,
  209. "slot_time": book_res.book_time,
  210. "timestamp": int(time.time()),
  211. "payment_link": book_res.payment_link
  212. }
  213. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  214. push_content = (
  215. f"🎉 【预定成功通知】\n"
  216. f"━━━━━━━━━━━━━━━\n"
  217. f"订单编号: {order_id}\n"
  218. f"预约账号: {book_res.account}\n"
  219. f"预约日期: {book_res.book_date}\n"
  220. f"预约时间: {book_res.book_time}\n"
  221. f"预约编号: {book_res.urn}\n"
  222. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  223. f"━━━━━━━━━━━━━━━\n"
  224. )
  225. VSCloudApi.Instance().push_weixin_text(push_content)
  226. self.redis_client.zrem(self.m_tracker_key, task_id)
  227. # === 核心:成功次数判断 ===
  228. task.successful_bookings += 1
  229. max_b = self.m_cfg.booker.max_bookings_per_account
  230. if max_b > 0 and task.successful_bookings >= max_b:
  231. self._log(f"Account reached max bookings ({max_b}). Destroying instance.")
  232. self._remove_task(task, "max bookings reached")
  233. else:
  234. self._log(f"❌ BOOK FAILED for Order: {order_id}")
  235. except Exception as e:
  236. err_str = str(e)
  237. self._log(f"Exception during booking: {err_str}")
  238. rate_limited_indicators = [
  239. "42901" in err_str,
  240. "Rate limited" in err_str
  241. ]
  242. if any(rate_limited_indicators):
  243. is_rate_limited = True
  244. self._remove_task(task, "booking rate limited")
  245. if task_data and task_id is not None:
  246. task_meta = task_data.get('meta') or {}
  247. t_fails = task_meta.get('booking_failures', 0) + 1
  248. task_meta['booking_failures'] = t_fails
  249. try:
  250. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  251. except Exception as cloud_err:
  252. self._log(f"Failed to update task meta: {cloud_err}")
  253. t_cd = self.task_backoff.calculate(t_fails)
  254. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  255. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  256. finally:
  257. if not booking_success and task_id is not None and not is_rate_limited:
  258. self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
  259. self._log(f"♻️ Task={task_id} normal failure. Instantly handed over to Sweeper.")
  260. def _creator_loop(self):
  261. self._log("Creator loop started.")
  262. spawn_interval = 10.0
  263. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  264. while not self.m_stop_event.is_set():
  265. time.sleep(2.0)
  266. if self.redis_client.exists(group_cd_key):
  267. continue
  268. with self.m_lock:
  269. current = len(self.m_tasks)
  270. pending = self.m_pending_builtin
  271. target = self.m_cfg.booker.target_instances
  272. if (current + pending) < target:
  273. now = time.time()
  274. if now - self.m_last_spawn_time >= spawn_interval:
  275. self.m_last_spawn_time = now
  276. self._spawn_worker()
  277. def _spawn_worker(self):
  278. with self.m_lock:
  279. self.m_pending_builtin += 1
  280. def _job():
  281. try:
  282. plg_cfg = VSPlgConfig()
  283. plg_cfg.debug = self.m_cfg.debug
  284. plg_cfg.free_config = self.m_cfg.free_config
  285. plg_cfg.session_max_life = self.m_cfg.session_max_life
  286. if self.m_cfg.need_account:
  287. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd, test=False)
  288. plg_cfg.account.id = acc['id']
  289. plg_cfg.account.username = acc['username']
  290. plg_cfg.account.password = acc['password']
  291. if self.m_cfg.need_proxy:
  292. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False)
  293. plg_cfg.proxy.id = proxy['id']
  294. plg_cfg.proxy.ip = proxy['ip']
  295. plg_cfg.proxy.port = proxy['port']
  296. plg_cfg.proxy.proto = proxy['proto']
  297. plg_cfg.proxy.username = proxy['username']
  298. plg_cfg.proxy.password = proxy['password']
  299. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  300. instance.set_log(self.m_logger)
  301. instance.set_config(plg_cfg)
  302. instance.create_session()
  303. with self.m_lock:
  304. all_keys = [apt.routing_key for apt in self.m_cfg.appointment_types]
  305. self.m_tasks.append(
  306. Task(
  307. instance=instance,
  308. qw_cfg=self.m_cfg.query_wait,
  309. next_run=time.time(),
  310. task_ref=None,
  311. acceptable_routing_keys=all_keys,
  312. source_queue="built-in",
  313. book_allowed=True,
  314. next_remote_ping = time.time() + random.randint(180, 300)
  315. )
  316. )
  317. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  318. self.redis_client.delete(group_fail_key)
  319. self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}")
  320. except Exception as e:
  321. err_str = str(e)
  322. resource_not_found_indicators = [
  323. "40401" in err_str,
  324. "Account not found" in err_str,
  325. "Proxy not found" in err_str,
  326. ]
  327. if any(resource_not_found_indicators):
  328. return
  329. self._log(f"Spawn failed: {e}")
  330. rate_limited_indicators = [
  331. "42901" in err_str,
  332. "Rate limited" in err_str
  333. ]
  334. if any(rate_limited_indicators):
  335. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  336. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  337. # 更新全局(机器组)失败次数
  338. g_fails = self.redis_client.incr(group_fail_key)
  339. # 计算退避时间
  340. g_cd = self.group_backoff.calculate(g_fails)
  341. # 设置 Redis 全局冷却保护阀
  342. self.redis_client.set(group_cd_key, "1", ex=int(g_cd))
  343. self._log(f"📉 [Rate Limited] Group '{self.m_cfg.identifier}' failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
  344. finally:
  345. with self.m_lock:
  346. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  347. ThreadPool.getInstance().enqueue(_job)