booker_builtin.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import redis
  7. from typing import List, Dict, Callable
  8. import configure
  9. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  10. from vs_plg_factory import VSPlgFactory
  11. from toolkit.thread_pool import ThreadPool
  12. from toolkit.vs_cloud_api import VSCloudApi
  13. from toolkit.backoff import ExponentialBackoff
  14. class BuiltinBookerGCO:
  15. """
  16. 非绑定模式 (公共内置账号池):
  17. - 只维护全局 target_instances 数量的实例。
  18. - 所有实例热机等待,发现信号后临时去云端 Pop 订单。
  19. """
  20. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  21. self.m_cfg = cfg
  22. self.m_factory = VSPlgFactory()
  23. self.m_logger = logger
  24. self.m_tasks: List[Task] = []
  25. self.m_lock = threading.RLock()
  26. self.m_stop_event = threading.Event()
  27. redis_common_kwargs = {
  28. **redis_conf,
  29. "socket_timeout": 5,
  30. "socket_connect_timeout": 5,
  31. # 会自动发送 PING
  32. "health_check_interval": 15,
  33. # TCP KeepAlive
  34. "socket_keepalive": True,
  35. "retry_on_timeout": True,
  36. "decode_responses": False,
  37. }
  38. self.redis_com = redis.Redis(**redis_common_kwargs)
  39. self.redis_sub = redis.Redis(**redis_common_kwargs)
  40. self.m_pending_builtin = 0
  41. self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
  42. self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=10*60.0, factor=2.0)
  43. self.task_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
  44. self.m_last_spawn_time = 0.0
  45. self.heartbeat_ttl = 300
  46. def _log(self, message):
  47. if self.m_logger:
  48. self.m_logger(f'[BUILTIN-BOOKER] [{self.m_cfg.identifier}] {message}')
  49. def start(self):
  50. if not self.m_cfg.enable:
  51. return
  52. self._log("Starting Built-in Booker...")
  53. plugin_name = self.m_cfg.plugin_config.plugin_name
  54. class_name = "".join(part.title() for part in plugin_name.split('_'))
  55. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  56. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  57. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  58. threading.Thread(target=self._creator_loop, daemon=True).start()
  59. threading.Thread(target=self._maintain_loop, daemon=True).start()
  60. def stop(self):
  61. self._log("Stopping Booker...")
  62. self.m_stop_event.set()
  63. self._cleanup_all_tasks("booker stop")
  64. def _cleanup_task(self, task: Task, reason: str = ""):
  65. try:
  66. instance = getattr(task, 'instance', None)
  67. if instance and hasattr(instance, 'cleanup'):
  68. instance.cleanup()
  69. self._log(f"🧹 Cleaned up built-in instance. Reason: {reason}")
  70. except Exception as e:
  71. self._log(f"Cleanup failed for built-in instance. Reason: {reason}. Error: {e}")
  72. def _remove_task(self, task: Task, reason: str = "", cleanup: bool = True):
  73. removed = False
  74. with self.m_lock:
  75. if task in self.m_tasks:
  76. self.m_tasks.remove(task)
  77. removed = True
  78. if cleanup and removed:
  79. self._cleanup_task(task, reason)
  80. return removed
  81. def _cleanup_all_tasks(self, reason: str = ""):
  82. with self.m_lock:
  83. tasks = list(self.m_tasks)
  84. self.m_tasks.clear()
  85. for task in tasks:
  86. self._cleanup_task(task, reason)
  87. def _get_redis_key(self, routing_key: str) -> str:
  88. return f"vs:signal:{routing_key}"
  89. def _maintain_loop(self):
  90. self._log("Maintain loop started.")
  91. while not self.m_stop_event.is_set():
  92. time.sleep(1.0)
  93. now = time.time()
  94. with self.m_lock:
  95. tasks_to_check = list(self.m_tasks)
  96. if not tasks_to_check:
  97. continue
  98. healthy_tasks = []
  99. dead_tasks = []
  100. for t in tasks_to_check:
  101. if now >= t.next_remote_ping:
  102. try:
  103. t.instance.keep_alive()
  104. if t.instance.health_check():
  105. healthy_tasks.append(t)
  106. next_delay = random.randint(60, 180)
  107. t.next_remote_ping = now + next_delay
  108. else:
  109. dead_tasks.append(t)
  110. self._log(f"♻️ Instance unhealthy. Will be removed.")
  111. except Exception as e:
  112. dead_tasks.append(t)
  113. self._log(f"Instance keep-alive failed: {e}")
  114. else:
  115. healthy_tasks.append(t)
  116. if dead_tasks:
  117. with self.m_lock:
  118. current_tasks = list(self.m_tasks)
  119. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  120. for t in dead_tasks:
  121. if t in current_tasks:
  122. self._cleanup_task(t, "unhealthy or keep-alive failed")
  123. else:
  124. with self.m_lock:
  125. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  126. def _booking_trigger_loop(self):
  127. self._log("Pub/Sub Trigger loop started.")
  128. channel_to_routing_key = {}
  129. for apt in self.m_cfg.appointment_types:
  130. channel = self._get_redis_key(apt.routing_key)
  131. channel_to_routing_key[channel] = apt.routing_key
  132. if not channel_to_routing_key:
  133. self._log("No appointment types configured. Exiting trigger loop.")
  134. return
  135. pubsub = None
  136. while not self.m_stop_event.is_set():
  137. try:
  138. if pubsub is None:
  139. pubsub = self.redis_sub.pubsub(ignore_subscribe_messages=False)
  140. channels_to_sub = list(channel_to_routing_key.keys())
  141. self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
  142. pubsub.subscribe(*channels_to_sub)
  143. message = pubsub.get_message(timeout=5.0)
  144. if not message:
  145. continue
  146. channel = message['channel']
  147. if isinstance(channel, bytes):
  148. channel = channel.decode('utf-8')
  149. if message['type'] == 'subscribe':
  150. active_subs = message['data']
  151. self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})")
  152. continue
  153. if message['type'] != 'message':
  154. continue
  155. raw_data = message['data']
  156. if isinstance(raw_data, bytes):
  157. raw_data = raw_data.decode('utf-8')
  158. routing_key = channel_to_routing_key.get(channel)
  159. if not routing_key:
  160. continue
  161. try:
  162. data = json.loads(raw_data)
  163. query_result = VSQueryResult.model_validate(data['query_result'])
  164. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  165. except Exception as parse_err:
  166. self._log(f"Data parsing error for channel {channel}: {parse_err}")
  167. continue
  168. now = time.time()
  169. matching_tasks = []
  170. with self.m_lock:
  171. for task in self.m_tasks:
  172. if now < task.next_run or not task.book_allowed:
  173. continue
  174. if routing_key not in task.acceptable_routing_keys:
  175. continue
  176. task.next_run = now + self.m_cfg.booker.booking_cooldown
  177. matching_tasks.append(task)
  178. if matching_tasks:
  179. for task in matching_tasks:
  180. self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}")
  181. t = threading.Thread(
  182. target=self._execute_book_job,
  183. args=(task, query_result),
  184. daemon=True
  185. )
  186. t.start()
  187. except Exception as e:
  188. self._log(f"Trigger loop pub/sub error: {e}")
  189. if pubsub:
  190. try:
  191. pubsub.close()
  192. except:
  193. pass
  194. pubsub = None
  195. time.sleep(2)
  196. if pubsub:
  197. pubsub.close()
  198. self._log("Pub/Sub connection closed.")
  199. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  200. queue_name = f"auto.{query_result.apt_type.routing_key}"
  201. task_id = None
  202. task_data = None
  203. booking_success = False
  204. is_rate_limited = False
  205. try:
  206. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  207. if not task_data:
  208. return
  209. task_id = task_data['id']
  210. order_id = task_data.get('order_id')
  211. self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
  212. user_input = task_data.get('user_inputs', {})
  213. book_res = task.instance.book(query_result, user_input)
  214. if book_res.success:
  215. booking_success = True
  216. self._log(f"✅ BOOK SUCCESS! Order: {order_id}")
  217. grab_info = {
  218. "account": book_res.account,
  219. "session_id": book_res.session_id,
  220. "urn": book_res.urn,
  221. "slot_date": book_res.book_date,
  222. "slot_time": book_res.book_time,
  223. "timestamp": int(time.time()),
  224. "payment_link": book_res.payment_link
  225. }
  226. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  227. push_content = (
  228. f"🎉 【预定成功通知】\n"
  229. f"━━━━━━━━━━━━━━━\n"
  230. f"订单编号: {order_id}\n"
  231. f"预约账号: {book_res.account}\n"
  232. f"预约日期: {book_res.book_date}\n"
  233. f"预约时间: {book_res.book_time}\n"
  234. f"预约编号: {book_res.urn}\n"
  235. f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
  236. f"━━━━━━━━━━━━━━━\n"
  237. )
  238. VSCloudApi.Instance().push_weixin_text(push_content)
  239. self.redis_com.zrem(self.m_tracker_key, task_id)
  240. # === 核心:成功次数判断 ===
  241. task.successful_bookings += 1
  242. max_b = self.m_cfg.booker.max_bookings_per_account
  243. if max_b > 0 and task.successful_bookings >= max_b:
  244. self._log(f"Account reached max bookings ({max_b}). Destroying instance.")
  245. self._remove_task(task, "max bookings reached")
  246. else:
  247. self._log(f"❌ BOOK FAILED for Order: {order_id}")
  248. except Exception as e:
  249. err_str = str(e)
  250. self._log(f"Exception during booking: {err_str}")
  251. rate_limited_indicators = [
  252. "42901" in err_str,
  253. "Rate limited" in err_str
  254. ]
  255. if any(rate_limited_indicators):
  256. is_rate_limited = True
  257. self._remove_task(task, "booking rate limited")
  258. if task_data and task_id is not None:
  259. task_meta = task_data.get('meta') or {}
  260. t_fails = task_meta.get('booking_failures', 0) + 1
  261. task_meta['booking_failures'] = t_fails
  262. try:
  263. VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
  264. except Exception as cloud_err:
  265. self._log(f"Failed to update task meta: {cloud_err}")
  266. t_cd = self.task_backoff.calculate(t_fails)
  267. self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
  268. self.redis_com.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
  269. finally:
  270. if not booking_success and task_id is not None and not is_rate_limited:
  271. self.redis_com.zadd(self.m_tracker_key, {str(task_id): 0})
  272. self._log(f"♻️ Task={task_id} normal failure. Instantly handed over to Sweeper.")
  273. def _creator_loop(self):
  274. self._log("Creator loop started.")
  275. spawn_interval = 10.0
  276. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  277. while not self.m_stop_event.is_set():
  278. time.sleep(2.0)
  279. if self.redis_com.exists(group_cd_key):
  280. continue
  281. with self.m_lock:
  282. current = len(self.m_tasks)
  283. pending = self.m_pending_builtin
  284. target = self.m_cfg.booker.target_instances
  285. if (current + pending) < target:
  286. now = time.time()
  287. if now - self.m_last_spawn_time >= spawn_interval:
  288. self.m_last_spawn_time = now
  289. self._spawn_worker()
  290. def _spawn_worker(self):
  291. with self.m_lock:
  292. self.m_pending_builtin += 1
  293. def _job():
  294. try:
  295. plg_cfg = VSPlgConfig()
  296. plg_cfg.debug = self.m_cfg.debug
  297. plg_cfg.free_config = self.m_cfg.free_config
  298. plg_cfg.session_max_life = self.m_cfg.session_max_life
  299. if self.m_cfg.need_account:
  300. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd)
  301. plg_cfg.account.id = acc['id']
  302. plg_cfg.account.username = acc['username']
  303. plg_cfg.account.password = acc['password']
  304. if self.m_cfg.need_proxy:
  305. proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
  306. plg_cfg.proxy.id = proxy['id']
  307. plg_cfg.proxy.ip = proxy['ip']
  308. plg_cfg.proxy.port = proxy['port']
  309. plg_cfg.proxy.proto = proxy['proto']
  310. plg_cfg.proxy.username = proxy['username']
  311. plg_cfg.proxy.password = proxy['password']
  312. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  313. instance.set_log(self.m_logger)
  314. instance.set_config(plg_cfg)
  315. instance.create_session()
  316. with self.m_lock:
  317. all_keys = [apt.routing_key for apt in self.m_cfg.appointment_types]
  318. self.m_tasks.append(
  319. Task(
  320. instance=instance,
  321. qw_cfg=self.m_cfg.query_wait,
  322. next_run=time.time(),
  323. task_ref=None,
  324. acceptable_routing_keys=all_keys,
  325. source_queue="built-in",
  326. book_allowed=True,
  327. next_remote_ping = time.time() + random.randint(180, 300)
  328. )
  329. )
  330. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  331. self.redis_com.delete(group_fail_key)
  332. self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}")
  333. except Exception as e:
  334. err_str = str(e)
  335. resource_not_found_indicators = [
  336. "40401" in err_str,
  337. "Account not found" in err_str,
  338. "Proxy not found" in err_str,
  339. ]
  340. if any(resource_not_found_indicators):
  341. return
  342. self._log(f"Spawn failed: {e}")
  343. rate_limited_indicators = [
  344. "42901" in err_str,
  345. "Rate limited" in err_str
  346. ]
  347. if any(rate_limited_indicators):
  348. group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
  349. group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
  350. # 更新全局(机器组)失败次数
  351. g_fails = self.redis_com.incr(group_fail_key)
  352. # 计算退避时间
  353. g_cd = self.group_backoff.calculate(g_fails)
  354. # 设置 Redis 全局冷却保护阀
  355. self.redis_com.set(group_cd_key, "1", ex=int(g_cd))
  356. self._log(f"📉 [Rate Limited] Group '{self.m_cfg.identifier}' failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
  357. finally:
  358. with self.m_lock:
  359. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  360. ThreadPool.getInstance().enqueue(_job)