booker_builtin.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import os
  2. import time
  3. import json
  4. import threading
  5. import random
  6. import traceback
  7. import redis
  8. from typing import List, Dict, Callable
  9. from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
  10. from vs_plg_factory import VSPlgFactory
  11. from toolkit.thread_pool import ThreadPool
  12. from toolkit.vs_cloud_api import VSCloudApi
  13. from toolkit.proxy_manager import ProxyManager
  14. class BuiltinBookerGCO:
  15. """
  16. 非绑定模式 (公共内置账号池):
  17. - 只维护全局 target_instances 数量的实例。
  18. - 所有实例热机等待,发现信号后临时去云端 Pop 订单。
  19. """
  20. def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
  21. self.m_cfg = cfg
  22. self.m_factory = VSPlgFactory()
  23. self.m_logger = logger
  24. self.m_tasks: List[Task] = []
  25. self.m_lock = threading.RLock()
  26. self.m_stop_event = threading.Event()
  27. self.redis_client = redis.Redis(**redis_conf)
  28. self.m_pending_builtin = 0
  29. def _log(self, message):
  30. if self.m_logger:
  31. self.m_logger(f'[BUILTIN-BOOKER] [{self.m_cfg.identifier}] {message}')
  32. def start(self):
  33. if not self.m_cfg.enable:
  34. return
  35. self._log("Starting Built-in Booker...")
  36. plugin_name = self.m_cfg.plugin_config.plugin_name
  37. class_name = "".join(part.title() for part in plugin_name.split('_'))
  38. plugin_path = os.path.join(self.m_cfg.plugin_config.lib_path, self.m_cfg.plugin_config.plugin_bin)
  39. self.m_factory.register_plugin(plugin_name, plugin_path, class_name)
  40. threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
  41. threading.Thread(target=self._creator_loop, daemon=True).start()
  42. threading.Thread(target=self._maintain_loop, daemon=True).start()
  43. def stop(self):
  44. self._log("Stopping Booker...")
  45. self.m_stop_event.set()
  46. def _get_redis_key(self, routing_key: str) -> str:
  47. return f"vs:signal:{routing_key}"
  48. def _maintain_loop(self):
  49. self._log("Maintain loop started.")
  50. rng = random.Random()
  51. while not self.m_stop_event.is_set():
  52. wait_seconds = rng.randint(180, 300)
  53. for _ in range(wait_seconds):
  54. if self.m_stop_event.is_set():
  55. return
  56. time.sleep(1.0)
  57. with self.m_lock:
  58. tasks_to_check = list(self.m_tasks)
  59. healthy_tasks = []
  60. for t in tasks_to_check:
  61. try:
  62. t.instance.keep_alive()
  63. if t.instance.health_check():
  64. healthy_tasks.append(t)
  65. except Exception as e:
  66. self._log(f"Instance keep-alive failed: {e}")
  67. with self.m_lock:
  68. self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
  69. def _booking_trigger_loop(self):
  70. self._log("Trigger loop started.")
  71. while not self.m_stop_event.is_set():
  72. try:
  73. time.sleep(1.0)
  74. now = time.time()
  75. for apt_type in self.m_cfg.appointment_types:
  76. redis_key = self._get_redis_key(apt_type.routing_key)
  77. raw_data = self.redis_client.get(redis_key)
  78. if not raw_data:
  79. continue
  80. try:
  81. data = json.loads(raw_data)
  82. query_result = VSQueryResult.model_validate(data['query_result'])
  83. query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
  84. except Exception as parse_err:
  85. self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
  86. self.redis_client.delete(redis_key)
  87. continue
  88. matching_tasks = []
  89. with self.m_lock:
  90. for task in self.m_tasks:
  91. if now < task.next_run or not task.book_allowed:
  92. continue
  93. if apt_type.routing_key not in task.acceptable_routing_keys:
  94. continue
  95. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
  96. task.next_run = now + self.m_cfg.booker.booking_cooldown
  97. matching_tasks.append(task)
  98. if matching_tasks:
  99. threads = []
  100. for task in matching_tasks:
  101. self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
  102. t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
  103. threads.append(t)
  104. t.start()
  105. for t in threads:
  106. t.join()
  107. except Exception as e:
  108. self._log(f"Trigger loop error: {e}")
  109. time.sleep(2)
  110. def _execute_book_job(self, task: Task, query_result: VSQueryResult):
  111. queue_name = f"auto.{query_result.apt_type.routing_key}"
  112. task_id = None
  113. booking_success = False
  114. try:
  115. task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
  116. if not task_data:
  117. return
  118. task_id = task_data['id']
  119. order_id = task_data.get('order_id')
  120. user_input = task_data.get('user_inputs', {})
  121. book_res = task.instance.book(query_result, user_input)
  122. if book_res.success:
  123. booking_success = True
  124. self._log(f"✅ BOOK SUCCESS! Order: {order_id}")
  125. grab_info = {
  126. "account": book_res.account,
  127. "session_id": book_res.session_id,
  128. "urn": book_res.urn,
  129. "slot_date": book_res.book_date,
  130. "slot_time": book_res.book_time,
  131. "timestamp": int(time.time()),
  132. "payment_link": book_res.payment_link
  133. }
  134. VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
  135. # === 核心:成功次数判断 ===
  136. task.successful_bookings += 1
  137. max_b = self.m_cfg.booker.max_bookings_per_account
  138. if max_b > 0 and task.successful_bookings >= max_b:
  139. self._log(f"Account reached max bookings ({max_b}). Destroying instance.")
  140. with self.m_lock:
  141. if task in self.m_tasks:
  142. self.m_tasks.remove(task)
  143. else:
  144. self._log(f"❌ BOOK FAILED for Order: {order_id}")
  145. except Exception as e:
  146. self._log(f"Exception during booking: {e}")
  147. finally:
  148. if not booking_success and task_id:
  149. try:
  150. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  151. except Exception as ex:
  152. self._log(f"Failed to return task: {ex}")
  153. def _creator_loop(self):
  154. self._log("Creator loop started.")
  155. while not self.m_stop_event.is_set():
  156. time.sleep(5.0)
  157. with self.m_lock:
  158. current = len(self.m_tasks)
  159. if (current + self.m_pending_builtin) < self.m_cfg.booker.target_instances:
  160. self._spawn_worker()
  161. def _spawn_worker(self):
  162. with self.m_lock:
  163. self.m_pending_builtin += 1
  164. def _job():
  165. try:
  166. plg_cfg = VSPlgConfig()
  167. plg_cfg.debug = self.m_cfg.debug
  168. plg_cfg.free_config = self.m_cfg.free_config
  169. plg_cfg.session_max_life = self.m_cfg.session_max_life
  170. if self.m_cfg.need_account:
  171. acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd * 60)
  172. if not acc:
  173. return
  174. plg_cfg.account.id = acc['id']
  175. plg_cfg.account.username = acc['username']
  176. plg_cfg.account.password = acc['password']
  177. if self.m_cfg.need_proxy:
  178. proxy = ProxyManager.Instance().next(self.m_cfg.proxy_pool, lock_duration=self.m_cfg.proxy_lock_interval)
  179. if not proxy:
  180. return
  181. plg_cfg.proxy.id = proxy['id']
  182. plg_cfg.proxy.ip = proxy['ip']
  183. plg_cfg.proxy.port = proxy['port']
  184. plg_cfg.proxy.scheme = proxy['scheme']
  185. plg_cfg.proxy.username = proxy['username']
  186. plg_cfg.proxy.password = proxy['password']
  187. instance = self.m_factory.create(self.m_cfg.identifier, self.m_cfg.plugin_config.plugin_name)
  188. instance.set_log(self.m_logger)
  189. instance.set_config(plg_cfg)
  190. instance.create_session()
  191. with self.m_lock:
  192. all_keys = [apt.routing_key for apt in self.m_cfg.appointment_types]
  193. self.m_tasks.append(
  194. Task(
  195. instance=instance,
  196. qw_cfg=self.m_cfg.query_wait,
  197. next_run=time.time(),
  198. task_ref=None,
  199. acceptable_routing_keys=all_keys,
  200. source_queue="built-in",
  201. book_allowed=True
  202. )
  203. )
  204. self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}")
  205. except Exception as e:
  206. err_str = str(e)
  207. if "40401" in err_str or "Account not found" in err_str:
  208. return
  209. self._log(f"Spawn failed: {e}")
  210. finally:
  211. with self.m_lock:
  212. self.m_pending_builtin = max(0, self.m_pending_builtin - 1)
  213. ThreadPool.getInstance().enqueue(_job)