jerry 3 mesi fa
parent
commit
322e14e58c
1 ha cambiato i file con 176 aggiunte e 219 eliminazioni
  1. 176 219
      gco.py

+ 176 - 219
gco.py

@@ -103,252 +103,201 @@ class GCO:
         self._log("[START] monitor loop starting...")
         rng = random.Random()
         
-        # 创建一个临时线程池用于并发抢票,避免阻塞监控循环太久
-        # max_workers 根据你的最大并发账号数调整,或者设为 None (默认 CPU核心数 * 5)
-        
         while not self.m_stop_event.is_set():
-            sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
-            time.sleep(sleep_ms)
-            
-            now = time.time()
-            
-            # 1. 拷贝任务列表 (Snapshot)
-            tasks_to_process = []
-            with self.m_lock:
-                tasks_to_process = list(self.m_tasks)
-            
-            # 标记本轮循环是否触发了批量抢票
-            batch_booking_triggered = False
-
-            for task in tasks_to_process:
-                # 二次检查任务是否还在列表中(防止在遍历过程中被移除)
-                with self.m_lock:
-                    if task not in self.m_tasks:
-                        continue 
-
-                # 健康检查
-                if not task.instance.health_check():
-                    continue 
+            try:
+                sleep_ms = 0.1 + rng.randint(0, 20) / 1000.0
+                time.sleep(sleep_ms)
                 
-                # 检查时间窗口
-                if now < task.next_run:
-                    continue
+                now = time.time()
                 
-                # === 执行查询 ===
-                apt_type = None
-                try:
-                    apt_types = self.m_cfg.appointment_types
-                    if not apt_types:
-                        self._log(f"No matching appointment configuration found.")
-                        continue
+                tasks_to_process = []
+                with self.m_lock:
+                    tasks_to_process = list(self.m_tasks)
+                
+                batch_booking_triggered = False
+
+                for task in tasks_to_process:
+                    with self.m_lock:
+                        if task not in self.m_tasks:
+                            continue 
+
+                    if not task.instance.health_check():
+                        continue 
                     
-                    weights = [float(item.weight) for item in apt_types]
-                    apt_type = random.choices(apt_types, weights=weights, k=1)[0]
+                    if now < task.next_run:
+                        continue
                     
-                    VSCloudApi.Instance().slot_refresh_start(
-                        apt_type.routing_key,
-                        country=apt_type.country,
-                        city=apt_type.city,
-                        visa_type=apt_type.visa_type
-                    )
-                    # 这里的 task 充当了“哨兵”的角色
-                    result = task.instance.query(apt_type)
-                    result.apt_type = apt_type 
-                    VSCloudApi.Instance().slot_refresh_success(
-                        apt_type.routing_key
-                    )
-                    if result.success:
-                        self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING for {len(tasks_to_process)} workers.")
-                       
-                        # 上报Slot Snapshot
-                        query_payload = result.to_snapshot_payload()
-                        query_payload["website"] = self.m_cfg.website
-                        query_payload["snapshot_source"] = 'worker'
-                        query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
-                        VSCloudApi.Instance().slot_snapshot_report(query_payload)
+                    apt_type = None
+                    try:
+                        apt_types = self.m_cfg.appointment_types
+                        if not apt_types:
+                            self._log(f"No matching appointment configuration found.")
+                            continue
                         
-                        # === [核心修改]:一人发现,全员出击 ===
-                        # 1. 准备并发任务
-                        # 我们使用刚刚快照的 tasks_to_process,或者重新获取一次全量列表
-                        # 重点:所有实例 (w.instance) 都使用同一份查询结果 (result) 去抢
-                        futures = []
-                        for worker in tasks_to_process:
-                            # 提交到线程池并发执行,极大减少时间差
-                            f = ThreadPool.getInstance().enqueue(
-                                self._on_query_result, 
-                                worker, 
-                                result
-                            )
-                            futures.append(f)
+                        weights = [float(item.weight) for item in apt_types]
+                        apt_type = random.choices(apt_types, weights=weights, k=1)[0]
                         
-                        # 2. 等待所有抢票任务结束 (可选,根据业务需求决定是否阻塞监控)
-                        # 如果不wait,监控线程会立刻继续,可能导致重复触发
-                        # 建议 wait,确保这一波抢票彻底结束
-                        for f in futures:
-                            try:
-                                f.result() # 获取结果,捕获异常
-                            except Exception as e:
-                                self._log(f"Batch booking exception: {e}")
-
-                        # 3. 标记触发状态
-                        batch_booking_triggered = True
-                        break 
-                    
-                    else:
-                        # 没查到,仅当前 task 记录日志
-                        self._log(f"Query done by {task.instance.get_group_id()}, No availability")
-
-                except Exception as e:
-                    traceback.print_exc()
-                    self._log(f"Exception during query: {e}")
-                    if apt_type:
-                        VSCloudApi.Instance().slot_refresh_fail(
+                        VSCloudApi.Instance().slot_refresh_start(
                             apt_type.routing_key,
-                            error=str(e)
+                            country=apt_type.country,
+                            city=apt_type.city,
+                            visa_type=apt_type.visa_type
+                        )
+                        
+                        result = task.instance.query(apt_type)
+                        result.apt_type = apt_type 
+                        
+                        VSCloudApi.Instance().slot_refresh_success(
+                            apt_type.routing_key
                         )
 
-                # === 计算下次运行时间 (仅针对当前 query 的 task,除非触发了 batch) ===
-                if not batch_booking_triggered:
-                    interval = 30
-                    mode = task.qw_cfg.mode
-                    if mode == QueryWaitMode.Loop:
-                        interval = 0
-                    elif mode == QueryWaitMode.Fixed:
-                        interval = task.qw_cfg.fixed_wait
-                    elif mode == QueryWaitMode.Random:
-                        interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
-                    
-                    task.next_run = time.time() + interval
-
-            # === [修正后的批量冷却逻辑] ===
-            # 抢票结束后,必须严格按照每个账号配置的频率进入冷却,防止集体429
-            if batch_booking_triggered:
-                self._log(f"Batch booking finished. Resetting wait times based on configurations.")
-                
-                # 重新获取当前时间(因为抢票过程消耗了时间)
-                now_ts = time.time()
-                
-                with self.m_lock:
-                    for t in self.m_tasks:
-                        # 重新读取该任务的配置
-                        interval = 30 # 默认兜底
-                        mode = t.qw_cfg.mode
+                        if result.success:
+                            self._log(f"🔥 Slot Found by [{task.instance.get_group_id()}]! Triggering BATCH BOOKING.")
+                           
+                            query_payload = result.to_snapshot_payload()
+                            query_payload["website"] = self.m_cfg.website
+                            query_payload["snapshot_source"] = 'worker'
+                            query_payload["snapshot_at"] = datetime.now(timezone.utc).isoformat()
+                            VSCloudApi.Instance().slot_snapshot_report(query_payload)
+
+                            futures = []
+                            for worker in tasks_to_process:
+                                f = ThreadPool.getInstance().enqueue(
+                                    self._on_query_result, 
+                                    worker, 
+                                    result
+                                )
+                                futures.append(f)
+                            
+                            for f in futures:
+                                try:
+                                    f.result() 
+                                except Exception as e:
+                                    pass
+
+                            batch_booking_triggered = True
+                            break 
                         
+                        else:
+                            self._log(f"Query done by {task.instance.get_group_id()}, No availability")
+
+                    except Exception as e:
+                        traceback.print_exc()
+                        self._log(f"Exception during query: {e}")
+                        if apt_type:
+                            VSCloudApi.Instance().slot_refresh_fail(
+                                apt_type.routing_key,
+                                error=str(e)
+                            )
+
+                    if not batch_booking_triggered:
+                        interval = 30
+                        mode = task.qw_cfg.mode
                         if mode == QueryWaitMode.Loop:
-                            # 即使是 Loop 模式,在大规模抢票后建议给一个微小的缓冲(如1秒),避免死循环导致 CPU 飙升
-                            # 如果你的逻辑允许立刻重试,这里可以是 0
-                            interval = 1 
+                            interval = 0
                         elif mode == QueryWaitMode.Fixed:
-                            interval = t.qw_cfg.fixed_wait
+                            interval = task.qw_cfg.fixed_wait
                         elif mode == QueryWaitMode.Random:
-                            interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
+                            interval = rng.randint(task.qw_cfg.random_min, task.qw_cfg.random_max)
                         
-                        # 设置下次运行时间
-                        t.next_run = now_ts + interval
-                
-                self._log(f"All workers cooldown reset. Resuming monitor loop.")
+                        task.next_run = time.time() + interval
 
-            # 清理不健康实例
-            with self.m_lock:
-                initial_size = len(self.m_tasks)
-                self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
-                if len(self.m_tasks) < initial_size:
-                    self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
+                if batch_booking_triggered:
+                    self._log(f"Batch booking finished. Resetting wait times.")
+                    now_ts = time.time()
+                    
+                    with self.m_lock:
+                        for t in self.m_tasks:
+                            interval = 30
+                            mode = t.qw_cfg.mode
+                            if mode == QueryWaitMode.Loop:
+                                interval = 1 # 稍微给点缓冲
+                            elif mode == QueryWaitMode.Fixed:
+                                interval = t.qw_cfg.fixed_wait
+                            elif mode == QueryWaitMode.Random:
+                                interval = rng.randint(t.qw_cfg.random_min, t.qw_cfg.random_max)
+                            
+                            t.next_run = now_ts + interval
+                    
+                    self._log(f"All workers cooldown reset. Resuming monitor loop.")
+
+                with self.m_lock:
+                    initial_size = len(self.m_tasks)
+                    self.m_tasks[:] = [t for t in self.m_tasks if t.instance.health_check()]
+                    if len(self.m_tasks) < initial_size:
+                        self._log(f"Removed {initial_size - len(self.m_tasks)} unhealthy instance(s). Remaining: {len(self.m_tasks)}")
+
+            except Exception as outer_e:
+                traceback.print_exc()
+                self._log(f"🔥 CRITICAL ERROR in monitor loop (recovered): {outer_e}")
+                # 遇到严重错误休眠 1 秒,防止死循环刷屏占满 CPU
+                time.sleep(1.0)
 
         self._log("[STOP] monitor loop exiting...")
 
     def _creator_loop(self):
         """
-        @brief 创建者循环:双轨制并发控制 + Pending 计数防超发
+        @brief 创建者循环:双轨制并发控制 (精简高可用版)
         """
         self._log("[START] creator loop starting...")
         
         while not self.m_stop_event.is_set():
-            time.sleep(1.0) # 保持 1秒间隔,给资源管理器缓冲时间
-
-            # -------------------------------------------------------------
-            # 1. 统计当前状态 (Active + Pending)
-            # -------------------------------------------------------------
-            current_builtin = 0
-            current_orders = 0
-            
-            with self.m_lock:
-                for t in self.m_tasks:
-                    if t.task_ref:
-                        current_orders += 1
-                    else:
-                        current_builtin += 1
-                
-                # 读取正在创建中的计数 (快照)
-                pending_builtin = self.m_pending_builtin
-                pending_orders = self.m_pending_orders
+            try:
+                time.sleep(1.0) 
 
-            # -------------------------------------------------------------
-            # 2. 计算缺口
-            # -------------------------------------------------------------
-            
-            # === [内置账号缺口] ===
-            needed_builtin = 0
-            # 逻辑:只要目标数量 > 0,就尝试补充 (兼容需要账号和不需要账号的模式)
-            if self.m_cfg.target_instances > 0:
-                total_builtin_proj = current_builtin + pending_builtin
-                needed_builtin = self.m_cfg.target_instances - total_builtin_proj
-
-            # === [订单账号缺口] ===
-            needed_order = 0
-            # 逻辑:必须需要账号(need_account=True) 且 限制数量大于 0
-            # (order_account_enable 已移除,直接看 limit)
-            if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
-                total_order_proj = current_orders + pending_orders
-                needed_order = self.m_cfg.order_account_online_limit - total_order_proj
-
-            # 如果两边都满了,跳过本轮
-            if needed_builtin <= 0 and needed_order <= 0:
-                continue
+                current_builtin = 0
+                current_orders = 0
+                with self.m_lock:
+                    for t in self.m_tasks:
+                        if t.task_ref: current_orders += 1
+                        else: current_builtin += 1
+                    
+                    pending_builtin = self.m_pending_builtin
+                    pending_orders = self.m_pending_orders
 
-            # -------------------------------------------------------------
-            # 3. 准备配置
-            # -------------------------------------------------------------
-            config_data = self._prepare_next_config(
-                need_builtin=(needed_builtin > 0),
-                need_order=(needed_order > 0),
-            )
+                needed_builtin = 0
+                if self.m_cfg.target_instances > 0:
+                    needed_builtin = self.m_cfg.target_instances - (current_builtin + pending_builtin)
 
-            if not config_data:
-                time.sleep(10.0)
-                continue
+                needed_order = 0
+                if self.m_cfg.need_account and self.m_cfg.order_account_online_limit > 0:
+                    needed_order = self.m_cfg.order_account_online_limit - (current_orders + pending_orders)
 
-            plg_cfg, task_ref = config_data
-            
-            # -------------------------------------------------------------
-            # 4. [绝对核心] 提交前立即增加 Pending 计数
-            # -------------------------------------------------------------
-            # 必须在这里加!防止下一秒循环重复创建!
-            with self.m_lock:
-                if task_ref:
-                    self.m_pending_orders += 1
-                else:
-                    self.m_pending_builtin += 1
-            
-            p_type = "Order" if task_ref else "Built-in"
-            self._log(f"+++ Spawning {p_type} (Pending: {self.m_pending_builtin}/{self.m_pending_orders})...")
+                if needed_builtin <= 0 and needed_order <= 0:
+                    continue
 
-            # 5. 异步提交
-            try:
-                ThreadPool.getInstance().enqueue(
-                    self._create_and_register_plg_worker, 
-                    plg_cfg, 
-                    task_ref
+                config_data = self._prepare_next_config(
+                    need_builtin=(needed_builtin > 0),
+                    need_order=(needed_order > 0),
                 )
-            except Exception as e:
-                # 提交失败回滚计数
-                self._log(f"Failed to enqueue task: {e}")
+                
+                if not config_data:
+                    time.sleep(5.0)
+                    continue
+
+                plg_cfg, task_ref = config_data
+                
                 with self.m_lock:
-                    if task_ref: self.m_pending_orders -= 1
-                    else: self.m_pending_builtin -= 1
-            
-            # 错开并发
-            time.sleep(random.uniform(0.5, 1.0))
+                    if task_ref: self.m_pending_orders += 1
+                    else: self.m_pending_builtin += 1
+                
+                try:
+                    ThreadPool.getInstance().enqueue(
+                        self._create_and_register_plg_worker, plg_cfg, task_ref
+                    )
+                    self._log(f"+++ Spawning {'Order' if task_ref else 'Builtin'}...")
+                except Exception as e:
+                    self._log(f"Enqueue failed, rolling back: {e}")
+                    with self.m_lock:
+                        if task_ref: self.m_pending_orders -= 1
+                        else: self.m_pending_builtin -= 1
+                
+                time.sleep(random.uniform(0.5, 1.0))
+
+            except Exception as outer_e:
+                traceback.print_exc()
+                self._log(f"🔥 Creator loop error: {outer_e}")
+                time.sleep(5.0)
 
         self._log("[STOP] creator loop exiting...")
 
@@ -380,11 +329,16 @@ class GCO:
             
             # A. 优先补充内置账号 (只要 target_instances 还有缺口)
             if need_builtin:
-                # 获取并锁定账号
-                account = VSCloudApi.Instance().get_next_account(
-                    pool_name, 
-                    lock_duration=self.m_cfg.account_login_interval * 60
-                )
+                try:
+                    # 获取并锁定账号
+                    account = VSCloudApi.Instance().get_next_account(
+                        pool_name,
+                        lock_duration=self.m_cfg.account_login_interval * 60
+                    )
+                except Exception as e:
+                    self._log(f"Get built-in account failed: {e}")
+                    account = None
+
                 if account:
                     plg_cfg.account.id = account["id"]
                     plg_cfg.account.username = account["username"]
@@ -393,6 +347,9 @@ class GCO:
                     config_ready = True
                     task_ref = None
                     self._log(f"Selected Built-in: {plg_cfg.account.username}")
+                else:
+                    self._log("No available built-in account")
+
 
             # B. 次选补充订单账号 (如果内置不需要 或 池子空了)
             # 只有当 limit > 0 时才尝试