root il y a 1 mois
Parent
commit
d86c81966e
8 fichiers modifiés avec 1216 ajouts et 49 suppressions
  1. 101 7
      booker_builtin.py
  2. 102 14
      booker_order.py
  3. 43 25
      docker-compose.yml
  4. 101 0
      main_sweeper.py
  5. 35 3
      sentinel.py
  6. 102 0
      test/test_capsolver.py
  7. 694 0
      test/tls_standalone.py
  8. 38 0
      toolkit/backoff.py

+ 101 - 7
booker_builtin.py

@@ -12,6 +12,7 @@ from vs_plg_factory import VSPlgFactory
 from toolkit.thread_pool import ThreadPool 
 from toolkit.vs_cloud_api import VSCloudApi
 from toolkit.proxy_manager import ProxyManager
+from toolkit.backoff import ExponentialBackoff
 
 class BuiltinBookerGCO:
     """
@@ -28,6 +29,12 @@ class BuiltinBookerGCO:
         self.m_stop_event = threading.Event()
         self.redis_client = redis.Redis(**redis_conf)
         self.m_pending_builtin = 0
+        
+        self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
+        # 组级别退避 (控制 spawn 频率):起步 1 分钟,封顶 1 小时
+        self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=3600.0, factor=2.0)
+        # 订单级别退避 (控制特定单据被风控的重试频率):起步 5 分钟,封顶 2 小时
+        self.task_backoff = ExponentialBackoff(base_delay=300.0, max_delay=7200.0, factor=2.0)
 
     def _log(self, message):
         if self.m_logger:
@@ -52,6 +59,29 @@ class BuiltinBookerGCO:
 
     def _get_redis_key(self, routing_key: str) -> str:
         return f"vs:signal:{routing_key}"
+    
+    def _safe_return_task(self, task_id: int, reason: str = ""):
+        if not task_id:
+            return
+        try:
+            task_data = VSCloudApi.Instance().get_vas_task(task_id)
+            if not task_data:
+                self.redis_client.zrem(self.m_tracker_key, task_id)
+                return
+            
+            current_status = task_data.get('status', '')
+            if current_status in['pending', 'grabbed', 'cancelled', 'success']:
+                self.redis_client.zrem(self.m_tracker_key, task_id)
+                return
+                
+            self._log(f"Returning task={task_id} to queue. Reason: {reason}")
+            VSCloudApi.Instance().return_vas_task_to_queue(task_id)
+            
+            # 归还成功,核销防丢记录
+            self.redis_client.zrem(self.m_tracker_key, task_id)
+            
+        except Exception as ex:
+            self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
 
     def _maintain_loop(self):
         self._log("Maintain loop started.")
@@ -129,6 +159,7 @@ class BuiltinBookerGCO:
         queue_name = f"auto.{query_result.apt_type.routing_key}"
 
         task_id = None
+        task_data = None
         booking_success = False
         try:
             task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
@@ -136,6 +167,8 @@ class BuiltinBookerGCO:
                 return 
             task_id = task_data['id']
             order_id = task_data.get('order_id')
+            self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
+            
             user_input = task_data.get('user_inputs', {})
             book_res = task.instance.book(query_result, user_input)
             if book_res.success:
@@ -152,6 +185,7 @@ class BuiltinBookerGCO:
                     "payment_link": book_res.payment_link
                 }
                 VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
+                self.redis_client.zrem(self.m_tracker_key, task_id)
                 
                 # === 核心:成功次数判断 ===
                 task.successful_bookings += 1
@@ -165,19 +199,53 @@ class BuiltinBookerGCO:
                 self._log(f"❌ BOOK FAILED for Order: {order_id}")
 
         except Exception as e:
-            self._log(f"Exception during booking: {e}")
+            err_str = str(e)
+            self._log(f"Exception during booking: {err_str}")
+            rate_limited_indicators = [
+                "42901" in err_str,
+                "Rate limited" in err_str
+            ]
+            if any(rate_limited_indicators):
+                with self.m_lock:
+                    if task in self.m_tasks:
+                        self.m_tasks.remove(task)
+                if task_data and task_id is not None:
+                    task_meta = task_data.get('meta', {})
+                    t_fails = task_meta.get('booking_failures', 0) + 1
+                    task_meta['booking_failures'] = t_fails
+                    
+                    try:
+                        VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
+                    except Exception as cloud_err:
+                        self._log(f"Failed to update task meta: {cloud_err}")
+                        
+                    t_cd = self.task_backoff.calculate(t_fails)
+                    self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
+                    
+                    def delayed_return(tid, wait_sec, reason):
+                        self.m_stop_event.wait(wait_sec)
+                        self._safe_return_task(tid, reason=reason)
+                        
+                    t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
+                    t.start()
+                    
+                    task_id = None  # 置空,拦截下方的 finally 直接归还 
+            
         finally:
-            if not booking_success and task_id:
-                try:
-                    VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-                except Exception as ex:
-                    self._log(f"Failed to return task: {ex}")
+           if not booking_success and task_id is not None:
+                self._safe_return_task(task_id, reason="Booking failed or error occurred")
 
     def _creator_loop(self):
         self._log("Creator loop started.")
+        group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
+        
         while not self.m_stop_event.is_set():
             time.sleep(5.0)
             with self.m_lock:
+                
+                if self.redis_client.exists(group_cd_key):
+                    continue
+                
                 current = len(self.m_tasks)
                 if (current + self.m_pending_builtin) < self.m_cfg.booker.target_instances:
                     self._spawn_worker()
@@ -185,6 +253,7 @@ class BuiltinBookerGCO:
     def _spawn_worker(self):
         with self.m_lock:
             self.m_pending_builtin += 1
+            
         def _job():
             try:
                 plg_cfg = VSPlgConfig()
@@ -229,13 +298,38 @@ class BuiltinBookerGCO:
                             book_allowed=True
                         )
                     )
+                    
+                    group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
+                    self.redis_client.delete(group_fail_key)
+                    
                 self._log(f"+++ Built-in Booker spawned: {plg_cfg.account.username}")
             except Exception as e:
                 err_str = str(e)
-                if "40401" in err_str or "Account not found" in err_str:
+                account_not_found_indicators = [
+                    "40401" in err_str,
+                    "Account not found" in err_str
+                ]
+                if any(account_not_found_indicators):
                     return
                 
                 self._log(f"Spawn failed: {e}")
+                
+                rate_limited_indicators = [
+                    "42901" in err_str,
+                    "Rate limited" in err_str
+                ]
+                if any(rate_limited_indicators):
+                    group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
+                    group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
+                    
+                    # 更新全局(机器组)失败次数
+                    g_fails = self.redis_client.incr(group_fail_key)
+                    # 计算退避时间
+                    g_cd = self.group_backoff.calculate(g_fails)
+                    # 设置 Redis 全局冷却保护阀
+                    self.redis_client.set(group_cd_key, "1", ex=int(g_cd))
+                    self._log(f"📉 [Rate Limited] Group '{self.m_cfg.identifier}' failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
+
             finally:
                 with self.m_lock:
                     self.m_pending_builtin = max(0, self.m_pending_builtin - 1)

+ 102 - 14
booker_order.py

@@ -12,6 +12,7 @@ from vs_plg_factory import VSPlgFactory
 from toolkit.thread_pool import ThreadPool 
 from toolkit.vs_cloud_api import VSCloudApi
 from toolkit.proxy_manager import ProxyManager
+from toolkit.backoff import ExponentialBackoff
 
 class OrderBookerGCO:
     """
@@ -29,6 +30,12 @@ class OrderBookerGCO:
         self.m_stop_event = threading.Event()
         self.redis_client = redis.Redis(**redis_conf)
         self.m_pending_order_by_queue: Dict[str, int] = {}
+        
+        self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
+        # 1. 队列级退避:失败起步5分钟,封顶1小时
+        self.queue_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=1*60*60.0, factor=2.0)
+        # 2. 账号级退避:失败起步30分钟,封顶3小时
+        self.account_backoff = ExponentialBackoff(base_delay=30*60.0, max_delay=3*60*60.0, factor=2.0)
 
     def _log(self, message):
         if self.m_logger:
@@ -55,28 +62,28 @@ class OrderBookerGCO:
         return f"vs:signal:{routing_key}"
 
     def _safe_return_task(self, task_id: int, reason: str = ""):
-        """安全地将订单归还给云端队列,防止复活已被取消或已抢成功的订单"""
         if not task_id:
             return
-            
         try:
             task_data = VSCloudApi.Instance().get_vas_task(task_id)
             if not task_data:
-                self._log(f"Task={task_id} not found in cloud, cannot return.")
+                self.redis_client.zrem(self.m_tracker_key, task_id)
                 return
-            order_id = task_data['order_id']
+            
             current_status = task_data.get('status', '')
-            # 如果订单已经被客户取消,或者已经成功,绝对不能还回队列!
-            if current_status in ['cancelled', 'grabbed', 'success']:
-                self._log(f"Task={task_id} is already '{current_status}'. Skipping return.")
+            if current_status in['pending', 'grabbed', 'cancelled', 'success']:
+                self.redis_client.zrem(self.m_tracker_key, task_id)
                 return
                 
             self._log(f"Returning task={task_id} to queue. Reason: {reason}")
             VSCloudApi.Instance().return_vas_task_to_queue(task_id)
             
+            # 归还成功,核销防丢记录
+            self.redis_client.zrem(self.m_tracker_key, task_id)
+            
         except Exception as ex:
-            self._log(f"Failed to safely return task for order_id {order_id}: {ex}")
-
+            self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
+            
     def _maintain_loop(self):
         self._log("Maintain loop started.")
         rng = random.Random()
@@ -161,23 +168,26 @@ class OrderBookerGCO:
 
     def _execute_book_job(self, task: Task, query_result: VSQueryResult):
         task_id = task.task_ref
-        if not task_id:
-            return
+        task_data = None
+        booking_success = False
 
         try:
             task_data = VSCloudApi.Instance().get_vas_task(task_id)
             if not task_data or task_data.get('status') in ['grabbed', 'cancelled']:
                 self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
                 with self.m_lock:
-                    if task in self.m_tasks: self.m_tasks.remove(task)
+                    if task in self.m_tasks:
+                        self.m_tasks.remove(task)
                 return
             
             order_id = task_data.get('order_id')
             user_input = task_data.get('user_inputs', {})
+            self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
 
             book_res = task.instance.book(query_result, user_input)
 
             if book_res.success:
+                booking_success = True
                 self._log(f"✅ BOOK SUCCESS! Order: {order_id}. Destroying instance.")
                 grab_info = {
                     "account": book_res.account,
@@ -189,6 +199,7 @@ class OrderBookerGCO:
                     "payment_link": book_res.payment_link
                 }
                 VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
+                self.redis_client.zrem(self.m_tracker_key, task_id)
                 
                 with self.m_lock:
                     if task in self.m_tasks:
@@ -197,7 +208,35 @@ class OrderBookerGCO:
                 self._log(f"❌ BOOK FAILED for Order: {order_id}. Will retry on next signal.")
 
         except Exception as e:
-            self._log(f"Exception during booking: {e}")
+            err_str = str(e)
+            self._log(f"Exception during booking: {err_str}")
+            rate_limited_indicators = [
+                "42901" in err_str,
+                "Rate limited" in err_str
+            ]
+            if any(rate_limited_indicators):
+                with self.m_lock:
+                    if task in self.m_tasks:
+                        self.m_tasks.remove(task)
+                if task_data and task_id is not None:
+                    task_meta = task_data.get('meta', {})
+                    t_fails = task_meta.get('booking_failures', 0) + 1
+                    task_meta['booking_failures'] = t_fails
+                    
+                    try:
+                        VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
+                    except Exception as cloud_err:
+                        self._log(f"Failed to update task meta: {cloud_err}")
+                        
+                    t_cd = self.task_backoff.calculate(t_fails)
+                    self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
+                    
+                    def delayed_return(tid, wait_sec, reason):
+                        self.m_stop_event.wait(wait_sec)
+                        self._safe_return_task(tid, reason=reason)
+                        
+                    t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
+                    t.start()
 
     def _creator_loop(self):
         self._log("Creator loop started.")
@@ -206,6 +245,11 @@ class OrderBookerGCO:
             with self.m_lock:
                 for apt in self.m_cfg.appointment_types:
                     r_key = apt.routing_key
+                    
+                    queue_cd_key = f"vs:queue:cooldown:{r_key}"
+                    if self.redis_client.exists(queue_cd_key):
+                        continue # 仍在冷却中,跳过取单
+                    
                     active = sum(1 for t in self.m_tasks if getattr(t, 'source_queue', '') == r_key)
                     pending = self.m_pending_order_by_queue.get(r_key, 0)
                     
@@ -227,6 +271,8 @@ class OrderBookerGCO:
                     return 
                 
                 task_id = task_data['id']
+                
+                self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
                 user_inputs = task_data.get('user_inputs', {})
                 
                 plg_cfg = VSPlgConfig()
@@ -266,14 +312,56 @@ class OrderBookerGCO:
                             book_allowed=True
                         )
                     )
+                    queue_fail_key = f"vs:queue:failures:{target_routing_key}"
+                    self.redis_client.delete(queue_fail_key)                    
                 success = True
                 self._log(f"+++ Order Booker spawned: {plg_cfg.account.username} (Target: {acceptable_keys})")
             except Exception as e:
                 err_str = str(e)
-                if "40401" in err_str or "Account not found" in err_str:
+                order_not_found_indicators = [
+                    "40401" in err_str,
+                    "Account not found" in err_str
+                ]
+                if any(order_not_found_indicators):
                     return
                 
                 self._log(f"Order Booker spawn failed: {e}")
+                
+                rate_limited_indicators = [
+                    "42901" in err_str,
+                    "Rate limited" in err_str
+                ]
+                if any(rate_limited_indicators):
+                    queue_fail_key = f"vs:queue:failures:{target_routing_key}"
+                    queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
+                    q_fails = self.redis_client.incr(queue_fail_key)
+                    q_cd = self.queue_backoff.calculate(q_fails)
+                    self.redis_client.set(queue_cd_key, "1", ex=int(q_cd))
+                    self._log(f"📉 [Rate Limited] Queue '{target_routing_key}' failed {q_fails} times. Global Backoff: {q_cd:.1f}s.")
+                    if task_id is not None:
+                        task_meta = task_data.get('meta', {})
+                        t_fails = task_meta.get('spawn_failures', 0) + 1
+                        task_meta['spawn_failures'] = t_fails
+                        
+                        # 立即持久化到云端
+                        try:
+                            VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
+                        except Exception as cloud_err:
+                            self._log(f"Failed to update task meta: {cloud_err}")
+                        
+                        # 使用工具计算该任务账号应该挂起的惩罚时间
+                        t_cd = self.account_backoff.calculate(t_fails)
+                        self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
+                        
+                        def delayed_return(tid, wait_sec, reason):
+                            self.m_stop_event.wait(wait_sec)
+                            self._safe_return_task(tid, reason=reason)
+                            
+                        t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Spawn failed: rate limited (fails={t_fails})"), daemon=True)
+                        t.start()
+                        
+                        task_id = None  # 置空防秒归还   
+                    
             finally:
                 with self.m_lock: 
                     self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)

+ 43 - 25
docker-compose.yml

@@ -1,11 +1,34 @@
 version: '3.8'
 
 services:
-  visa-sentinel:
+  # visa-sentinel:
+  #   build: .
+  #   image: coordinator:latest
+  #   container_name: coordinator-sentinel
+  #   command: ["python3", "main_sentinel.py"]
+  #   restart: unless-stopped
+  #   shm_size: '2gb'
+  #   volumes:
+  #     - ./config:/app/config
+  #     - ./logs:/app/logs
+  #     - ./data:/app/data
+  #     - ./plugins:/app/plugins
+  #   environment:
+  #     - TZ=Asia/Shanghai
+  #     - DISPLAY=:99
+  #     - CHROME_BIN=/usr/bin/chromium
+  #   # 资源限制
+  #   deploy:
+  #     resources:
+  #       limits:
+  #         cpus: '2.0'
+  #         memory: 4G
+
+  visa-booker:
     build: .
     image: coordinator:latest
-    container_name: coordinator-sentinel
-    command: ["python3", "main_sentinel.py"]
+    container_name: coordinator-booker
+    command: ["python3", "main_booker.py"]
     restart: unless-stopped
     shm_size: '2gb'
     volumes:
@@ -24,25 +47,20 @@ services:
           cpus: '2.0'
           memory: 4G
 
-  # visa-booker:
-  #   build: .
-  #   image: coordinator:latest
-  #   container_name: coordinator-booker
-  #   command: ["python3", "main_booker.py"]
-  #   restart: unless-stopped
-  #   shm_size: '2gb'
-  #   volumes:
-  #     - ./config:/app/config
-  #     - ./logs:/app/logs
-  #     - ./data:/app/data
-  #     - ./plugins:/app/plugins
-  #   environment:
-  #     - TZ=Asia/Shanghai
-  #     - DISPLAY=:99
-  #     - CHROME_BIN=/usr/bin/chromium
-  #   # 资源限制
-  #   deploy:
-  #     resources:
-  #       limits:
-  #         cpus: '2.0'
-  #         memory: 4G
+  visa-sweeper:
+    build: .
+    image: coordinator:latest
+    container_name: coordinator-sweeper
+    command: ["python3", "main_sweeper.py"]
+    restart: unless-stopped
+    volumes:
+      - ./config:/app/config
+      - ./logs:/app/logs
+    environment:
+      - TZ=Asia/Shanghai
+    # 资源限制极低,因为它只是个网络请求脚本,不运行浏览器
+    deploy:
+      resources:
+        limits:
+          cpus: '0.2'
+          memory: 256M

+ 101 - 0
main_sweeper.py

@@ -0,0 +1,101 @@
+import time
+import redis
+import logging
+import json
+import os
+from typing import Dict
+from toolkit.vs_cloud_api import VSCloudApi
+
+# 设置纯净的日志输出格式
+logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
+
+class OrphanTaskSweeper:
+    """
+    孤儿任务兜底回收程序 (Orphan Task Sweeper)
+    - 独立运行,不干扰主业务逻辑。
+    - 扫描 Redis 中所有超过指定时间没有被核销的 task_id,将其强制归还给云端队列。
+    """
+    def __init__(self, redis_conf: Dict, timeout_seconds: int = 900):
+        self.redis_client = redis.Redis(**redis_conf)
+        self.timeout_seconds = timeout_seconds  # 默认 900 秒 (15分钟)
+
+    def run_forever(self):
+        logging.info(f"🛡️ Task Sweeper started. Monitoring unreturned tasks (Timeout: {self.timeout_seconds}s)...")
+        while True:
+            try:
+                self._sweep()
+            except Exception as e:
+                logging.error(f"Sweeper loop error: {e}")
+            
+            # 每隔 60 秒巡检一次全局 Redis
+            time.sleep(60)
+
+    def _sweep(self):
+        # 查找所有节点的 tracking key (支持多机器集群部署)
+        tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
+        if not tracker_keys:
+            return
+
+        current_time = time.time()
+        # 凡是 score (入栈时间戳) 小于 cutoff_time 的,都是超时未归还的“死单”
+        cutoff_time = current_time - self.timeout_seconds
+
+        for key in tracker_keys:
+            key_str = key.decode('utf-8') if isinstance(key, bytes) else key
+            orphans = self.redis_client.zrangebyscore(key_str, 0, cutoff_time)
+            
+            if not orphans:
+                continue
+
+            for task_id_bytes in orphans:
+                task_id = int(task_id_bytes.decode('utf-8'))
+                self._rescue_task(key_str, task_id)
+
+    def _rescue_task(self, tracker_key: str, task_id: int):
+        logging.warning(f"⚠️ Found orphan task={task_id} in {tracker_key}. Attempting rescue...")
+        try:
+            task_data = VSCloudApi.Instance().get_vas_task(task_id)
+            
+            # 1. 云端彻底查不到这个订单了,说明已被物理删除,清理本地记录
+            if not task_data:
+                logging.info(f"Task={task_id} does not exist in cloud. Removing from local tracker.")
+                self.redis_client.zrem(tracker_key, task_id)
+                return
+
+            current_status = task_data.get('status', '')
+            
+            # 2. 订单状态已经是终态,不需要归还,直接核销本地记录
+            if current_status in['cancelled', 'grabbed', 'success']:
+                logging.info(f"Task={task_id} status is already '{current_status}'. Removing from tracker.")
+                self.redis_client.zrem(tracker_key, task_id)
+                return
+
+            # 3. 订单依然卡在 processing 状态,强制还给队列
+            logging.info(f"Returning orphan task={task_id} to queue.")
+            VSCloudApi.Instance().return_vas_task_to_queue(task_id)
+            
+            # 归还成功后,从 Redis 中移除追踪
+            self.redis_client.zrem(tracker_key, task_id)
+            logging.info(f"✅ Orphan task={task_id} successfully rescued and returned.")
+
+        except Exception as e:
+            # 遇到网络错误,不删 Redis 记录,让它在下一个 60 秒循环里继续被重试归还
+            logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
+
+
+if __name__ == "__main__":
+    # 【注意】这里请替换为你实际项目中加载 config 和 初始化 Cloud API 的代码
+    # 逻辑通常与你的 main_booker.py 顶部的初始化代码一模一样
+    config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") # 根据你的实际格式 (yaml/json)
+    
+    with open(config_path, 'r') as f:
+        config_data = json.load(f)
+        
+    redis_conf = config_data.get('redis', {})
+    
+    # 如果你的框架需要显式初始化 VSCloudApi,请在这里初始化
+    # VSCloudApi.Instance().init(...) 
+
+    # 启动兜底程序,设置超时阈值为 15 分钟 (900秒)
+    sweeper = OrphanTaskSweeper(redis_conf, timeout_seconds=900)
+    sweeper.run_forever()

+ 35 - 3
sentinel.py

@@ -11,6 +11,7 @@ from vs_plg_factory import VSPlgFactory
 from toolkit.thread_pool import ThreadPool 
 from toolkit.vs_cloud_api import VSCloudApi
 from toolkit.proxy_manager import ProxyManager
+from toolkit.backoff import ExponentialBackoff
 
 class SentinelGCO:
     def __init__(self, cfg: GroupConfig, redis_conf: Dict, logger: Callable[[str], None] = None):
@@ -23,6 +24,9 @@ class SentinelGCO:
         
         self.redis_client = redis.Redis(**redis_conf)
         self.m_pending_builtin = 0
+        
+        # 1. 全局建连退避:起步 1 分钟,封顶 1 小时 (保护登录接口)
+        self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=3600.0, factor=2.0)
 
     def _log(self, message):
         if self.m_logger:
@@ -114,6 +118,10 @@ class SentinelGCO:
         while not self.m_stop_event.is_set():
             time.sleep(2)
             with self.m_lock:
+                
+                if self.redis_client.exists(group_cd_key):
+                    continue
+                
                 current = len(self.m_tasks)
                 pending = self.m_pending_builtin
             
@@ -121,7 +129,8 @@ class SentinelGCO:
                 self._spawn_sentinel_worker()
 
     def _spawn_sentinel_worker(self):
-        with self.m_lock: self.m_pending_builtin += 1
+        with self.m_lock:
+            self.m_pending_builtin += 1
             
         def _job():
             try:
@@ -158,13 +167,36 @@ class SentinelGCO:
                 
                 with self.m_lock:
                     self.m_tasks.append(Task(instance=instance, qw_cfg=self.m_cfg.query_wait, next_run=time.time()))
+                
+                    group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
+                    self.redis_client.delete(group_fail_key)
+                
                 self._log(f"+++ Sentinel spawned: {plg_cfg.account.username}")
 
             except Exception as e:
                 err_str = str(e)
-                if "40401" in err_str or "Account not found" in err_str:
+                account_not_found_indicators = [
+                    "40401" in err_str,
+                    "Account not found" in err_str
+                ]
+                if any(account_not_found_indicators):
                     return
-                self._log(f"Sentinel spawn failed: {e}")
+                
+                self._log(f"Spawn failed: {e}")
+                
+                rate_limited_indicators = [
+                    "42901" in err_str,
+                    "Rate limited" in err_str
+                ]
+                if any(rate_limited_indicators):
+                    group_fail_key = f"vs:group:failures:{self.m_cfg.identifier}"
+                    group_cd_key = f"vs:group:cooldown:{self.m_cfg.identifier}"
+                    
+                    g_fails = self.redis_client.incr(group_fail_key)
+                    g_cd = self.group_backoff.calculate(g_fails)
+                    self.redis_client.set(group_cd_key, "1", ex=int(g_cd))
+                    self._log(f"📉 [Rate Limited] Sentinel Spawn failed {g_fails} times. Global Backoff: {g_cd:.1f}s.")
+                    
             finally:
                 with self.m_lock:
                     self.m_pending_builtin = max(0, self.m_pending_builtin - 1)

+ 102 - 0
test/test_capsolver.py

@@ -0,0 +1,102 @@
+import requests
+import time
+import json
+
+class CaptchaTester:
+    def __init__(self, api_key):
+        self.config = {
+            'capsolver_key': api_key
+        }
+
+    def _log(self, message):
+        print(f"[{time.strftime('%H:%M:%S')}] {message}")
+
+    def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy=False, action: str = None) -> str:
+        """通用解决验证码"""
+        capsolver_key = self.config.get('capsolver_key')
+        if not capsolver_key:
+            raise ValueError("Capsolver API key missing")
+
+        task = {
+            "type": task_type,
+            "websiteURL": page_url,
+            "websiteKey": site_key,
+        }
+        
+        if action:
+            task["pageAction"] = action
+
+        payload = {"clientKey": capsolver_key, "task": task}
+        
+        # 创建任务
+        res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20)
+        resp_json = res.json()
+        if resp_json.get("errorId") != 0:
+            raise Exception(f"创建任务失败: {res.text}")
+
+        task_id = resp_json.get("taskId")
+        self._log(f"任务已创建: {task_id}. 正在等待结果...")
+
+        # 轮询结果
+        for i in range(20):
+            r = requests.post(
+                "https://api.capsolver.com/getTaskResult", 
+                json={"clientKey": capsolver_key, "taskId": task_id}, 
+                timeout=20
+            )
+            data = r.json()
+            if data.get("status") == "ready":
+                self._log("验证码解决成功!")
+                return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token")
+            
+            self._log(f"等待中... ({i+1}次)")
+            time.sleep(3)
+        raise Exception("Capsolver 任务超时")
+
+    def test_score(self):
+        # 1. 参数配置
+        test_page = "https://antcpt.com"
+        site_key = "6LcR_okUAAAAAPYrPe-HK_0RULO1aZM15ENyM-Mf"
+        action = "homepage"
+        
+        try:
+            # 2. 获取 Token
+            token = self.solve_captcha(
+                page_url=test_page,
+                task_type="ReCaptchaV3TaskProxyLess", # 或者使用 ReCaptchaV3Task
+                site_key=site_key,
+                action=action
+            )
+
+            # 3. 发送到评分网站
+            self._log("正在提交 Token 到 ar1n.xyz 进行评分...")
+            score_url = 'https://ar1n.xyz/recaptcha3ScoreTest'
+            headers = {
+                'Accept': 'application/json, text/javascript, */*; q=0.01',
+                'Content-Type': 'application/json; charset=UTF-8',
+                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36',
+                'Origin': 'https://antcpt.com',
+                'Referer': 'https://antcpt.com/'
+            }
+            # 注意:这里的 key 必须是 g-recaptcha-reponse (网站特定的拼写错误)
+            post_data = {"g-recaptcha-reponse": token}
+            
+            response = requests.post(score_url, headers=headers, json=post_data)
+            
+            if response.status_code == 200:
+                result = response.json()
+                print("\n" + "="*30)
+                print("【评分结果】")
+                print(json.dumps(result, indent=4))
+                print("="*30)
+            else:
+                print(f"评分请求失败: {response.status_code}, {response.text}")
+
+        except Exception as e:
+            print(f"发生错误: {e}")
+
+# --- 运行测试 ---
+if __name__ == "__main__":
+    MY_CAPSOLVER_KEY = "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A"
+    tester = CaptchaTester(MY_CAPSOLVER_KEY)
+    tester.test_score()

+ 694 - 0
test/tls_standalone.py

@@ -0,0 +1,694 @@
+import time
+import json
+import random
+import re
+import os
+import uuid
+import socket
+import shutil
+import requests
+import threading
+import select
+import base64
+from datetime import datetime
+from urllib.parse import urlencode
+
+# DrissionPage 核心
+from DrissionPage import ChromiumPage, ChromiumOptions
+
+
+class ProxyTunnel:
+    """
+    【修复优化版】管理本地代理隧道
+    1. 启用 TCP_NODELAY 消除握手延迟 (关键修复)
+    2. 开启 KeepAlive 防止链路中断
+    3. 修复非阻塞模式下 sendall 导致的数据丢失问题
+    """
+    def __init__(self, upstream_ip, upstream_port, username, password):
+        self.upstream_ip = upstream_ip
+        self.upstream_port = int(upstream_port)
+        self.username = username
+        self.password = password
+        
+        # 预先计算 Proxy-Authorization 头
+        auth_str = f"{username}:{password}"
+        b64_auth = base64.b64encode(auth_str.encode()).decode()
+        self.auth_header = f"Proxy-Authorization: Basic {b64_auth}\r\n"
+        
+        self.server_socket = None
+        self.local_port = 0
+        self.running = False
+        self.listen_thread = None
+
+    def start(self):
+        try:
+            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            
+            self.server_socket.bind(('127.0.0.1', 0))
+            self.local_port = self.server_socket.getsockname()[1]
+            self.server_socket.listen(128) # 增加连接队列长度
+            
+            self.running = True
+            
+            self.listen_thread = threading.Thread(target=self._accept_loop, daemon=True)
+            self.listen_thread.start()
+            
+            return f"127.0.0.1:{self.local_port}"
+        except Exception as e:
+            self.stop()
+            raise RuntimeError(f"Failed to start tunnel: {e}")
+
+    def stop(self):
+        self.running = False
+        if self.server_socket:
+            try:
+                self.server_socket.close()
+            except Exception:
+                pass
+        self.server_socket = None
+
+    def _accept_loop(self):
+        while self.running:
+            try:
+                if self.server_socket:
+                    # 使用 select 替代 settimeout,减少 CPU 空转
+                    r, _, _ = select.select([self.server_socket], [], [], 1.0)
+                    if r:
+                        try:
+                            client_sock, _ = self.server_socket.accept()
+                            t = threading.Thread(target=self._handle_client, args=(client_sock,), daemon=True)
+                            t.start()
+                        except OSError:
+                            break
+            except Exception:
+                continue
+
+    def _optimize_socket(self, sock):
+        """核心优化:设置 Socket 选项"""
+        try:
+            # 1. 禁用 Nagle 算法:数据包立即发送,不等待填满缓冲区
+            # 这是解决 "HttpClient Timeout" 的关键
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+            
+            # 2. 开启 KeepAlive:防止防火墙切断空闲连接
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            
+            # 3. 增大缓冲区 (Linux/Mac 可选,Windows 一般自动管理)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32*1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32*1024)
+        except Exception:
+            pass
+
+    def _handle_client(self, client_sock):
+        upstream_sock = None
+        try:
+            self._optimize_socket(client_sock)
+            client_sock.settimeout(30) 
+            
+            # 1. 读取首包 (32KB 缓冲区)
+            try:
+                first_packet = client_sock.recv(32768)
+            except socket.timeout:
+                return # 客户端连上但不发数据
+                
+            if not first_packet:
+                return
+
+            # 2. 连接上游
+            upstream_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self._optimize_socket(upstream_sock) # 同样优化上游 Socket
+            
+            upstream_sock.settimeout(10) # 连接超时 10s
+            upstream_sock.connect((self.upstream_ip, self.upstream_port))
+            
+            # 连接建立后,将超时设为 None (阻塞模式),交由 select 控制
+            upstream_sock.settimeout(None)
+            client_sock.settimeout(None)
+            
+            # 3. 注入 Header
+            sep = b'\r\n'
+            idx = first_packet.find(sep)
+            if idx != -1:
+                new_packet = first_packet[:idx+2] + self.auth_header.encode() + first_packet[idx+2:]
+            else:
+                new_packet = first_packet
+
+            # 4. 发送首包 (阻塞式发送,保证数据完整)
+            upstream_sock.sendall(new_packet)
+            
+            # 5. 双向转发
+            self._pipe_sockets(client_sock, upstream_sock)
+
+        except Exception:
+            pass
+        finally:
+            self._close_socket(client_sock)
+            self._close_socket(upstream_sock)
+
+    def _pipe_sockets(self, sock1, sock2):
+        """
+        修复后的转发逻辑:
+        保持 Socket 为阻塞模式,利用 select 监听可读状态。
+        """
+        sockets = [sock1, sock2]
+        last_activity = time.time()
+        IDLE_TIMEOUT = 120 # 延长空闲超时
+        
+        while self.running:
+            try:
+                # 监听可读事件
+                r, _, x = select.select(sockets, [], sockets, 1.0)
+                
+                if x: break # Socket 异常
+                
+                if not r:
+                    if time.time() - last_activity > IDLE_TIMEOUT:
+                        break
+                    continue
+                
+                for s in r:
+                    try:
+                        # 尝试读取
+                        data = s.recv(32768)
+                    except ConnectionResetError:
+                        data = None
+                    
+                    if not data:
+                        return # 连接关闭
+                    
+                    # 确定发送目标
+                    target = sock2 if s is sock1 else sock1
+                    
+                    # 关键修改:使用阻塞式 sendall
+                    # 如果网络卡顿,线程会在这里暂停等待,而不是抛出错误或丢包
+                    try:
+                        target.sendall(data)
+                    except BrokenPipeError:
+                        return
+                        
+                    last_activity = time.time()
+                    
+            except Exception:
+                break
+
+    def _close_socket(self, sock):
+        """优雅关闭 Socket"""
+        if sock:
+            try:
+                # 发送 FIN 包,通知对端数据发送完毕
+                sock.shutdown(socket.SHUT_RDWR)
+            except Exception:
+                pass
+            try:
+                sock.close()
+            except Exception:
+                pass
+
+    def __del__(self):
+        self.stop()
+
+class BrowserResponse:
+    """模拟 requests.Response 的轻量级对象"""
+    def __init__(self, result_dict):
+        result_dict = result_dict or {}
+        self.status_code = result_dict.get('status', 0)
+        self.text = result_dict.get('body', '')
+        self.headers = result_dict.get('headers', {})
+        self.url = result_dict.get('url', '')
+
+class TlsAutoBot:
+    def __init__(self, config: dict):
+        """
+        config 包含: proxy, account, capsolver_key, apt_config (code, country, city), target_dates 等
+        """
+        self.config = config
+        self.instance_id = uuid.uuid4().hex[:8]
+        self.workspace = os.path.abspath(os.path.join("data", f"tls_session_{self.instance_id}"))
+        self.page = None
+        self.travel_group = None
+        self.tunnel = None
+
+    def _log(self, msg):
+        print(f"[TLS-Bot-{self.instance_id}] {msg}")
+
+    def _get_free_port(self):
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(('', 0))
+            return s.getsockname()[1]
+
+    def init_browser(self):
+        self._log("Initializing browser...")
+        co = ChromiumOptions()
+        
+        # 1. 端口与路径隔离
+        port = self._get_free_port()
+        co.set_local_port(port)
+        co.set_user_data_path(self.workspace)
+        
+        # 2. 代理配置
+        proxy_cfg = self.config.get('proxy', {})
+
+            
+        if proxy_cfg.get("username") and proxy_cfg.get("password"):
+            self._log(f"Starting Proxy Tunnel for {proxy_cfg.get('ip')}...")
+            
+            # 1. 启动本地隧道
+            self.tunnel = ProxyTunnel(proxy_cfg.get('ip'), proxy_cfg.get('port'), proxy_cfg.get('username'), proxy_cfg.get('password'))
+            local_proxy = self.tunnel.start()
+            
+            self._log(f"Tunnel started at {local_proxy}")
+            
+            # 2. Chrome 连接本地免密端口
+            # 必须使用 --proxy-server 强制指定,绝对稳健
+            co.set_argument(f'--proxy-server={local_proxy}')
+            
+        else:
+            # 无密码代理,直接用
+            proxy_str = f"{proxy_cfg.get('schema')}://{proxy_cfg.get('ip')}:{proxy_cfg.get('port')}"
+            co.set_argument(f'--proxy-server={proxy_str}')
+
+        # 3. 反爬配置
+        co.headless(False)
+        co.set_argument('--no-sandbox')
+        co.set_argument('--disable-gpu')
+        co.set_argument('--disable-dev-shm-usage')
+        co.set_argument('--window-size=1920,1080')
+        co.set_argument('--disable-blink-features=AutomationControlled')
+
+        self.page = ChromiumPage(co)
+
+    def solve_captcha(self, page_url: str, task_type: str, site_key: str, use_proxy = False, action: str = None) -> str:
+        """通用解决验证码 (同步 User-Agent 防止被盾识别为高风险)"""
+        capsolver_key = self.config.get('capsolver_key')
+        if not capsolver_key:
+            raise ValueError("Capsolver API key missing")
+
+        task = {
+            "type": task_type,
+            "websiteURL": page_url,
+            "websiteKey": site_key,
+        }
+        if use_proxy:
+            proxy = self.config['proxy']
+            task["proxyType"] = proxy.get('scheme', 'http')
+            task["proxyAddress"] = proxy.get('ip')
+            task["proxyPort"] = int(proxy.get('port'))
+            if proxy.get('username'):
+                task["proxyLogin"] = proxy.get('username')
+                task["proxyPassword"] = proxy.get('password')
+        if action:
+            task["pageAction"] = action
+
+        payload = {"clientKey": capsolver_key, "task": task}
+        res = requests.post("https://api.capsolver.com/createTask", json=payload, timeout=20)
+        if res.status_code != 200 or res.json().get("errorId") != 0:
+            raise Exception(f"Failed to create capsolver task: {res.text}")
+
+        task_id = res.json().get("taskId")
+        self._log(f"Task created: {task_id}. Waiting for solution...")
+
+        for _ in range(30):
+            r = requests.post(
+                "https://api.capsolver.com/getTaskResult", 
+                json={"clientKey": capsolver_key, "taskId": task_id}, 
+                timeout=20
+            )
+            data = r.json()
+            if data.get("status") == "ready":
+                self._log("Captcha solved successfully!")
+                return data["solution"].get("gRecaptchaResponse") or data["solution"].get("token")
+            time.sleep(3)
+        raise Exception("Capsolver task timeout")
+
+    def login(self):
+        """执行自动登录流程并提取 Group ID"""
+        self.init_browser()
+        
+        apt_config = self.config['apt_config']
+        login_url = "https://visas-fr.tlscontact.com/en-us/login"
+        params = {
+            "issuerId": apt_config["code"],
+            "country": apt_config["country"],
+            "vac": apt_config["code"],
+            "redirect": f"/en-us/country/{apt_config['country']}/vac/{apt_config['code']}"
+        }
+        full_login_url = f"{login_url}?{urlencode(params)}"
+        
+        self._log(f"Navigating to login: {full_login_url}")
+        self.page.get(full_login_url)
+        time.sleep(3)
+        wait_start = time.time()
+        while True:
+            # 获取页面 HTML,转小写
+            # 注意:如果此处报错 "页面被刷新",是 DrissionPage 的机制问题,
+            # 但你要求先不处理复杂错误,所以这里保持最简单的写法。
+            html = self.page.html.lower()
+            
+            # 检查是否在排队室 (法语或英语)
+            if "file d'attente" in html or "waiting room" in html:
+                # 如果等太久(比如1小时),就强制停止
+                if time.time() - wait_start > 6 * 60:
+                    self._log("Waiting room timeout (1h).")
+                    break
+                    
+                self._log("In Waiting Room... Waiting for auto-refresh.")
+                time.sleep(30) # 截图说页面会自动刷新,所以这里只sleep,不动浏览器
+            else:
+                # 页面里没有“等候室”的字了,说明出来了
+                break
+
+        if not self.page.ele('#email-input-field'):
+            self._log("Form not found, reloading...")
+            self.page.get(full_login_url)
+            self.page.wait.ele_displayed('#email-input-field', timeout=15)
+
+        self._log("Waiting 3 seconds for Captcha scripts to load...")
+        time.sleep(3)
+
+        g_token = ""
+        # 判断登录页是否有 ReCaptcha
+        if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'):
+            self._log("Login ReCaptcha detected, solving...")
+            # 登录页通常是 V2
+            g_token = self.solve_captcha(
+                page_url=self.page.url,
+                task_type="ReCaptchaV2TaskProxyLess",
+                site_key="6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0"
+            )
+
+        account = self.config['account']
+        js_login = f"""
+        var u = document.getElementById('email-input-field');
+        if(u) {{ u.value = "{account['username']}"; u.dispatchEvent(new Event('input', {{bubbles:true}})); }}
+        
+        var p = document.getElementById('password-input-field');
+        if(p) {{ p.value = "{account['password']}"; p.dispatchEvent(new Event('input', {{bubbles:true}})); }}
+        
+        var g = document.getElementById('g-recaptcha-response');
+        if(g) {{ g.value = "{g_token}"; }}
+        
+        var btn = document.getElementById('btn-login');
+        if(btn) {{ btn.click(); return true; }} else {{ return false; }}
+        """
+        
+        self._log("Submitting Login via JS...")
+        self.page.run_js(js_login)
+
+        self._log("Waiting for dashboard redirect...")
+        self.page.wait.url_change('login-actions', exclude=True, timeout=45)
+        time.sleep(4)
+
+        if "login-actions" in self.page.url or "auth" in self.page.url:
+            raise Exception("Login Failed! Invalid credentials or Captcha rejected.")
+
+        self._log("Waiting for dashboard...")
+        self.page.wait.load_start()
+        time.sleep(5)
+
+        # 解析 Dashboard 提取 Group ID
+        self._log("Parsing Dashboard for Travel Group...")
+        html = self.page.html
+        js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
+        js_match = re.search(js_pattern, html, re.DOTALL)
+        
+        groups = []
+        if js_match:
+            json_str = js_match.group(1).replace(r'\"', '"')
+            groups = json.loads(json_str)
+
+        target_city = apt_config['city'].lower()
+        for g in groups:
+            if g.get('vacName', '').lower() == target_city:
+                self.travel_group = g
+                break
+        
+        if not self.travel_group:
+            raise Exception(f"Travel Group not found for city: {target_city}")
+
+        self._log(f"Login Success! Target Group ID: {self.travel_group['formGroupId']}")
+
+    def query_slots(self) -> list:
+        """通过 JS Fetch 获取可用日期并解析"""
+        self._log("Querying available slots...")
+        group_num = self.travel_group['formGroupId']
+        apt_config = self.config['apt_config']
+        interest_month = self.config.get("interest_month", time.strftime("%m-%Y"))
+        
+        url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
+        params = {'location': apt_config["code"], 'month': interest_month}
+        
+        # 组装完整的 query url
+        query_url = f"{url}?{urlencode(params)}"
+        
+        js_script = f"""
+        return fetch("{query_url}", {{ credentials: "include" }})
+            .then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
+            .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
+        """
+        res_dict = self.page.run_js(js_script)
+        resp = BrowserResponse(res_dict)
+
+        if resp.status_code != 200:
+            raise Exception(f"Query Failed: {resp.status_code} - {resp.text[:100]}")
+
+        # 解析正则 (保持原样)
+        slots = []
+        pattern = r'"availableAppointments\\":\s*(\[.*?\]),\\"showFlexiAppointment'
+        match = re.search(pattern, resp.text, re.DOTALL)
+        if match:
+            json_str = match.group(1).replace(r'\"', '"')
+            data = json.loads(json_str)
+            for day in data:
+                d_str = day.get('day')
+                for s in day.get('slots', []):
+                    labels = s.get('labels', [])
+                    if not labels:
+                        continue # 空数组说明 unavailable
+                    
+                    lbl = ""
+                    if 'pta' in labels: lbl = 'pta'
+                    elif 'ptaw' in labels: lbl = 'ptaw'
+                    elif '' in labels: lbl = ''
+                    
+                    slots.append({
+                        'date': d_str,
+                        'time': s.get('time'),
+                        'label': lbl
+                    })
+        return slots
+
+    def _filter_dates(self, available_dates: list, start_str: str, end_str: str) -> list:
+        if not start_str or not end_str:
+            return available_dates
+        valid = []
+        s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
+        e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
+        for d in available_dates:
+            curr = datetime.strptime(d, "%Y-%m-%d")
+            if s_date <= curr <= e_date:
+                valid.append(d)
+        return valid
+
+    def book(self, all_slots: list) -> bool:
+        """执行预定流程"""
+        if not all_slots:
+            self._log("No slots provided to book.")
+            return False
+
+        # 1. 过滤日期 & 筛选标签
+        target_labels = self.config.get('target_labels', [''])
+        exp_start = self.config.get('expected_start_date', '')
+        exp_end = self.config.get('expected_end_date', '')
+
+        # 提取唯一的可用日期列表
+        unique_dates = list(set([s['date'] for s in all_slots]))
+        valid_dates = self._filter_dates(unique_dates, exp_start, exp_end)
+        
+        possible_slots = [
+            s for s in all_slots 
+            if s['date'] in valid_dates and s['label'] in target_labels
+        ]
+
+        if not possible_slots:
+            self._log("No slots match target dates and labels.")
+            return False
+
+        # 2. 随机选择一个 Slot
+        selected = random.choice(possible_slots)
+        sel_date = selected['date']
+        sel_time = selected['time']
+        sel_label = selected['label']
+        
+        self._log(f"Selected Slot -> Date: {sel_date}, Time: {sel_time}, Label: {sel_label or 'standard'}")
+
+        group_num = self.travel_group['formGroupId']
+        apt_config = self.config['apt_config']
+        
+        base_url = f'https://visas-fr.tlscontact.com/en-us/{group_num}/workflow/appointment-booking'
+        # [关键修复] Next.js Action 必须带上正确的 Query 参数
+        year, month, day = sel_date.split('-')
+        formatted_month = f"{month}-{year}"
+        
+        full_url = f'{base_url}?location={apt_config["code"]}&month={formatted_month}'
+        
+        router_state = f'%5B%22%22%2C%7B%22children%22%3A%5B%5B%22lang%22%2C%22en-us%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%5B%22groupId%22%2C%22{group_num}%22%2C%22d%22%5D%2C%7B%22children%22%3A%5B%22workflow%22%2C%7B%22children%22%3A%5B%22appointment-booking%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D%7D%2Cnull%2Cnull%5D'
+
+        # 3. 获取金额 (Basket Cost)
+        self._log("Fetching basket cost...")
+        cost_payload = [{"groupId": str(group_num), "lang": "en-us", "labels": [sel_label]}]
+        cost_body_json = json.dumps(cost_payload)
+        
+        js_cost = f"""
+        return fetch("{full_url}", {{
+            method: 'POST',
+            headers: {{
+                'Next-Action': '40124cc90acef520d4fd2daf60ad3c8e21fc2c11d8',
+                'Next-Router-State-Tree': '{router_state}',
+                'Accept': 'text/x-component',
+                'Content-Type': 'text/plain;charset=UTF-8'
+            }},
+            body: `{cost_body_json}`
+        }}).then(async r => {{ return {{ status: r.status, body: await r.text() }}; }})
+          .catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
+        """
+        cost_res = BrowserResponse(self.page.run_js(js_cost))
+        if cost_res.status_code != 200:
+            self._log(f"Basket cost check failed: {cost_res.status_code}")
+            return False
+
+        # 4. 解决 ReCaptcha V3
+        self._log("Solving Booking ReCaptcha V3...")
+        g_token = self.solve_captcha(
+            page_url=full_url,
+            task_type="ReCaptchaV3Task",
+            site_key="6LcTpXcfAAAAAM3VojNhyV-F1z92ADJIvcSZ39Y9",
+            use_proxy=True,
+            action="book"
+        )
+
+        # 5. 提交 Booking
+        self._log("Submitting final booking request...")
+        js_book = f"""
+        const formData = new FormData();
+        formData.append('1_formGroupId', '{group_num}');
+        formData.append('1_lang', 'en-us');
+        formData.append('1_process', 'APPOINTMENT');
+        formData.append('1_location', '{apt_config["code"]}');
+        formData.append('1_date', '{sel_date}');
+        formData.append('1_time', '{sel_time}');
+        formData.append('1_appointmentLabel', '{sel_label}');
+        formData.append('1_captchaToken', '{g_token}');
+        formData.append('0', '[{{"status":"IDLE"}},"$K1"]');
+        
+        return fetch("{full_url}", {{
+            method: 'POST',
+            headers: {{
+                'Next-Action': '6043cfd107081bc817cbb11a8c0db17d3a063401be',
+                'Next-Router-State-Tree': '{router_state}',
+                'Accept': 'text/x-component'
+            }},
+            body: formData
+        }}).then(async r => {{
+            const hdrs = {{}};
+            r.headers.forEach((v, k) => hdrs[k] = v);
+            return {{ status: r.status, body: await r.text(), headers: hdrs, url: r.url }};
+        }}).catch(e => {{ return {{ status: 0, body: e.toString() }}; }});
+        """
+        
+        book_res_dict = self.page.run_js(js_book)
+        book_resp = BrowserResponse(book_res_dict)
+        
+        # 6. 解析结果 (判定 Next.js 跳转)
+        headers_lower = {str(k).lower(): v for k, v in book_resp.headers.items()}
+        action_redirect = headers_lower.get('x-action-redirect', '')
+
+        is_success = (
+            book_resp.status_code == 303 or 
+            (book_resp.status_code == 200 and ("appointment-confirmation" in action_redirect or "appointment-confirmation" in book_resp.url))
+        )
+
+        if is_success:
+            self._log(f"✅ BOOKING SUCCESS! Redirected to: {action_redirect or book_resp.url}")
+            return True
+        else:
+            self._log(f"❌ BOOKING FAILED! Status: {book_resp.status_code}")
+            if "APPOINTMENT_LIMIT_REACHED" in book_resp.text:
+                self._log("-> Reason: Appointment Limit Reached")
+            else:
+                self._log(f"-> Response Body: {book_resp.text[:300]}")
+            return False
+
+    def cleanup(self):
+        self._log("Cleaning up resources...")
+        if self.page:
+            try: self.page.quit()
+            except: pass
+        if os.path.exists(self.workspace):
+            time.sleep(1)
+            shutil.rmtree(self.workspace, ignore_errors=True)
+
+# =====================================================================
+# 运行主逻辑
+# =====================================================================
+if __name__ == "__main__":
+    
+    # 填写你的账号配置
+    MY_CONFIG = {
+        # 账号信息
+        "account": {
+            "username": "zhangsan06@gmail-app.com",
+            "password": "Visafly@111"
+        },
+        # 目标签证中心信息 (例如广州 TLS: cnCNG2fr)
+        "apt_config": {
+            "country": "cn",
+            "city": "Chengdu",     
+            "code": "cnCNG2fr" 
+        },
+        # 代理配置
+        "proxy": {
+            "scheme": "http",
+            "ip": "95.135.130.10",   
+            "port": "46107",      
+            "username": "Iz1WuKKwt1KUzEe",      
+            "password": "G7syngmdyGURblY"
+        },
+        # Capsolver API Key
+        "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
+        
+        # 查询的月份 (格式: MM-YYYY)
+        "interest_month": "06-2026",
+        
+        # 期望的日期范围
+        "expected_start_date": "2026-06-01",
+        "expected_end_date": "2026-06-30",
+        
+        # 目标标签: '' 是普通号, 'pta' 是 Prime 黄金时间号
+        "target_labels": ["", "pta"] 
+    }
+
+    bot = TlsAutoBot(config=MY_CONFIG)
+    
+    try:
+        # 1. 登录
+        bot.login()
+        
+        # 2. 检查是否有号
+        slots = bot.query_slots()
+        
+        if slots:
+            bot._log(f"Found {len(slots)} total available slots in this month.")
+            # 3. 尝试预订
+            success = bot.book(slots)
+            if success:
+                print("\n🎉 Congratulations! Slot booked successfully!")
+            else:
+                print("\n⚠️ Failed to book the slot.")
+        else:
+            bot._log("No available slots found for the requested criteria.")
+        time.sleep(3600)
+    except Exception as e:
+        bot._log(f"Error occurred during execution: {str(e)}")
+    finally:
+        bot.cleanup()

+ 38 - 0
toolkit/backoff.py

@@ -0,0 +1,38 @@
+import random
+
+class ExponentialBackoff:
+    """
+    通用指数退避计算工具
+    """
+    def __init__(self, base_delay: float, max_delay: float, factor: float = 2.0, jitter_range: tuple = (10.0, 45.0)):
+        """
+        :param base_delay: 基础退避时间(秒)
+        :param max_delay: 最大退避时间(秒)
+        :param factor: 指数乘数 (默认 2.0 即翻倍)
+        :param jitter_range: 随机抖动范围(最小秒数,最大秒数)
+        """
+        self.base_delay = base_delay
+        self.max_delay = max_delay
+        self.factor = factor
+        self.jitter_range = jitter_range
+
+    def calculate(self, failure_count: int) -> float:
+        """
+        根据连续失败次数计算下一次的冷却时间
+        :param failure_count: 连续失败次数 (1开始)
+        """
+        if failure_count <= 0:
+            return 0.0
+            
+        # 核心公式: 基础延迟 * (乘数 ^ (失败次数 - 1))
+        multiplier = self.factor ** (failure_count - 1)
+        delay = self.base_delay * multiplier
+        
+        # 限制上限
+        delay = min(delay, self.max_delay)
+        
+        # 增加随机抖动
+        if self.jitter_range:
+            delay += random.uniform(self.jitter_range[0], self.jitter_range[1])
+            
+        return delay