booker_builtin.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import traceback
  7. import redis
  8. from typing import List, Dict, Callable
  9. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  10. from vs_plg_factory import VSPlgFactory
  11. from toolkit.thread_pool import ThreadPool
  12. from toolkit.vs_cloud_api import VSCloudApi
  13. from toolkit.backoff import ExponentialBackoff
  14. class BuiltinBookerGCO:
  15. """
  16. 非绑定模式 (公共内置账号池):
  17. - 只维护全局 target_instances 数量的实例。
  18. - 所有实例热机等待,发现信号后临时去云端 Pop 订单。
  19. """
  20. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  21. self.m_cfg = cfg
  22. self.m_factory = VSPlgFactory()
  23. self.m_logger = logger
  24. self.m_tasks: List[Task] = []
  25. self.m_lock = threading.RLock()
  26. self.m_stop_event = threading.Event()
  27. self.redis_client = redis.Redis(**redis_conf)
  28. self.m_pending_builtin = 0
  29. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  30. self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=10*60.0, factor=2.0)
  31. self.task_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  32. self.m_last_spawn_time = 0.0
  33. def _log(self, message):
  34. if self.m_logger:
  35. self.m_logger(f'[BUILTIN-BOOKER] [{self.m_cfg.identifier}] {message}')
  36. def start(self):
  37. if not self.m_cfg.enable:
  38. return
  39. self._log("Starting Built-in Booker...")
  40. plugin_name = self.m_cfg.plugin_config.plugin_name
  41. class_name = "".join(part.title() for part in plugin_name.split('_'))
  42. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  43. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  44. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  45. threading.Thread(target=self._creator_loop, daemon=True).start()
  46. threading.Thread(target=self._maintain_loop, daemon=True).start()
  47. def stop(self):
  48. self._log("Stopping Booker...")
  49. self.m_stop_event.set()
  50. def _get_redis_key(self, routing_key: str) -> str:
  51. return f"vs:signal:{routing_key}"
  52. def _safe_return_task(self, task_id: int, reason: str = ""):
  53. if not task_id:
  54. return
  55. try:
  56. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  57. if not task_data:
  58. self.redis_client.zrem(self.m_tracker_key, task_id)
  59. return
  60. current_status = task_data.get('status', '')
  61. if current_status in['pending', 'grabbed', 'cancelled', 'success']:
  62. self.redis_client.zrem(self.m_tracker_key, task_id)
  63. return
  64. self._log(f"Returning task={task_id} to queue. Reason: {reason}")
  65. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  66. # 归还成功,核销防丢记录
  67. self.redis_client.zrem(self.m_tracker_key, task_id)
  68. except Exception as ex:
  69. self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
  70. def _maintain_loop(self):
  71. self._log("Maintain loop started.")
  72. rng = random.Random()
  73. while not self.m_stop_event.is_set():
  74. wait_seconds = rng.randint(180, 300)
  75. for _ in range(wait_seconds):
  76. if self.m_stop_event.is_set():
  77. return
  78. time.sleep(1.0)
  79. with self.m_lock:
  80. tasks_to_check = list(self.m_tasks)
  81. healthy_tasks = []
  82. for t in tasks_to_check:
  83. try:
  84. t.instance.keep_alive()
  85. if t.instance.health_check():
  86. healthy_tasks.append(t)
  87. except Exception as e:
  88. self._log(f"Instance keep-alive failed: {e}")
  89. with self.m_lock:
  90. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  91. def _booking_trigger_loop(self):
  92. self._log("Trigger loop started.")
  93. while not self.m_stop_event.is_set():
  94. try:
  95. time.sleep(1.0)
  96. now = time.time()
  97. for apt_type in self.m_cfg.appointment_types:
  98. redis_key = self._get_redis_key(apt_type.routing_key)
  99. raw_data = self.redis_client.get(redis_key)
  100. if not raw_data:
  101. continue
  102. try:
  103. data = json.loads(raw_data)
  104. query_result = VSQueryResult.model_validate(data['query_result'])
  105. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  106. except Exception as parse_err:
  107. self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
  108. self.redis_client.delete(redis_key)
  109. continue
  110. matching_tasks = []
  111. with self.m_lock:
  112. for task in self.m_tasks:
  113. if now < task.next_run or not task.book_allowed:
  114. continue
  115. if apt_type.routing_key not in task.acceptable_routing_keys:
  116. continue
  117. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
  118. task.next_run = now + self.m_cfg.booker.booking_cooldown
  119. matching_tasks.append(task)
  120. if matching_tasks:
  121. threads = []
  122. for task in matching_tasks:
  123. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
  124. t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
  125. threads.append(t)
  126. t.start()
  127. for t in threads:
  128. t.join()
  129. except Exception as e:
  130. self._log(f"Trigger loop error: {e}")
  131. time.sleep(2)
  132. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  133. queue_name = f"auto.{query_result.apt_type.routing_key}"
  134. task_id = None
  135. task_data = None
  136. booking_success = False
  137. try:
  138. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  139. if not task_data:
  140. return
  141. task_id = task_data['id']
  142. order_id = task_data.get('order_id')
  143. self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
  144. user_input = task_data.get('user_inputs', {})
  145. book_res = task.instance.book(query_result, user_input)
  146. if book_res.success:
  147. booking_success = True
  148. self._log(f"✅ BOOK SUCCESS! Order: {order_id}")
  149. grab_info = {
  150. "account": book_res.account,
  151. "session_id": book_res.session_id,
  152. "urn": book_res.urn,
  153. "slot_date": book_res.book_date,
  154. "slot_time": book_res.book_time,
  155. "timestamp": int(time.time()),
  156. "payment_link": book_res.payment_link
  157. }
  158. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  159. push_content = (
  160. f"🎉 【预定成功通知】\n"
  161. f"━━━━━━━━━━━━━━━\n"
  162. f"订单编号: {order_id}\n"
  163. f"预约账号: {book_res.account}\n"
  164. f"预约日期: {book_res.book_date}\n"
  165. f"预约时间: {book_res.book_time}\n"
  166. f"预约编号: {book_res.urn}\n"
  167. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  168. f"━━━━━━━━━━━━━━━\n"
  169. )
  170. VSCloudApi.Instance().push_weixin_text(push_content)
  171. self.redis_client.zrem(self.m_tracker_key, task_id)
  172. # === 核心:成功次数判断 ===
  173. task.successful_bookings += 1
  174. max_b = self.m_cfg.booker.max_bookings_per_account
  175. if max_b > 0 and task.successful_bookings >= max_b:
  176. self._log(f"Account reached max bookings ({max_b}). Destroying instance.")
  177. with self.m_lock:
  178. if task in self.m_tasks:
  179. self.m_tasks.remove(task)
  180. else:
  181. self._log(f"❌ BOOK FAILED for Order: {order_id}")
  182. except Exception as e:
  183. err_str = str(e)
  184. self._log(f"Exception during booking: {err_str}")
  185. rate_limited_indicators = [
  186. "42901" in err_str,
  187. "Rate limited" in err_str
  188. ]
  189. if any(rate_limited_indicators):
  190. with self.m_lock:
  191. if task in self.m_tasks:
  192. self.m_tasks.remove(task)
  193. if task_data and task_id is not None:
  194. task_meta = task_data.get('meta') or {}
  195. t_fails = task_meta.get('booking_failures', 0) + 1
  196. task_meta['booking_failures'] = t_fails
  197. try:
  198. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  199. except Exception as cloud_err:
  200. self._log(f"Failed to update task meta: {cloud_err}")
  201. t_cd = self.task_backoff.calculate(t_fails)
  202. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  203. def delayed_return(tid, wait_sec, reason):
  204. self.m_stop_event.wait(wait_sec)
  205. self._safe_return_task(tid, reason=reason)
  206. t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
  207. t.start()
  208. task_id = None
  209. finally:
  210. if not booking_success and task_id is not None:
  211. self._safe_return_task(task_id, reason="Booking failed or error occurred")
  212. def _creator_loop(self):
  213. self._log("Creator loop started.")
  214. spawn_interval = 10.0
  215. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  216. while not self.m_stop_event.is_set():
  217. time.sleep(2.0)
  218. if self.redis_client.exists(group_cd_key):
  219. continue
  220. with self.m_lock:
  221. current = len(self.m_tasks)
  222. pending = self.m_pending_builtin
  223. target = self.m_cfg.booker.target_instances
  224. if (current + pending) < target:
  225. now = time.time()
  226. if now - self.m_last_spawn_time >= spawn_interval:
  227. self.m_last_spawn_time = now
  228. self._spawn_worker()
  229. def _spawn_worker(self):
  230. with self.m_lock:
  231. self.m_pending_builtin += 1
  232. def _job():
  233. try:
  234. plg_cfg = VSPlgConfig()
  235. plg_cfg.debug = self.m_cfg.debug
  236. plg_cfg.free_config = self.m_cfg.free_config
  237. plg_cfg.session_max_life = self.m_cfg.session_max_life
  238. if self.m_cfg.need_account:
  239. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd)
  240. plg_cfg.account.id = acc['id']
  241. plg_cfg.account.username = acc['username']
  242. plg_cfg.account.password = acc['password']
  243. if self.m_cfg.need_proxy:
  244. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  245. plg_cfg.proxy.id = proxy['id']
  246. plg_cfg.proxy.ip = proxy['ip']
  247. plg_cfg.proxy.port = proxy['port']
  248. plg_cfg.proxy.proto = proxy['proto']
  249. plg_cfg.proxy.username = proxy['username']
  250. plg_cfg.proxy.password = proxy['password']
  251. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  252. instance.set_log(self.m_logger)
  253. instance.set_config(plg_cfg)
  254. instance.create_session()
  255. with self.m_lock:
  256. all_keys = [apt.routing_key for apt in self.m_cfg.appointment_types]
  257. self.m_tasks.append(
  258. Task(
  259. instance=instance,
  260. qw_cfg=self.m_cfg.query_wait,
  261. next_run=time.time(),
  262. task_ref=None,
  263. acceptable_routing_keys=all_keys,
  264. source_queue="built-in",
  265. book_allowed=True
  266. )
  267. )
  268. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  269. self.redis_client.delete(group_fail_key)
  270. self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}")
  271. except Exception as e:
  272. err_str = str(e)
  273. resource_not_found_indicators = [
  274. "40401" in err_str,
  275. "Account not found" in err_str,
  276. "Proxy not found" in err_str,
  277. ]
  278. if any(resource_not_found_indicators):
  279. return
  280. self._log(f"Spawn failed: {e}")
  281. rate_limited_indicators = [
  282. "42901" in err_str,
  283. "Rate limited" in err_str
  284. ]
  285. if any(rate_limited_indicators):
  286. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  287. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  288. # 更新全局(机器组)失败次数
  289. g_fails = self.redis_client.incr(group_fail_key)
  290. # 计算退避时间
  291. g_cd = self.group_backoff.calculate(g_fails)
  292. # 设置 Redis 全局冷却保护阀
  293. self.redis_client.set(group_cd_key, "1", ex=int(g_cd))
  294. self._log(f"📉 [Rate Limited] Group '{self.m_cfg.identifier}' failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
  295. finally:
  296. with self.m_lock:
  297. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  298. ThreadPool.getInstance().enqueue(_job)