jerry пре 1 недеља
родитељ
комит
312b01aa13

+ 72 - 43
booker_builtin.py

@@ -128,51 +128,80 @@ class BuiltinBookerGCO:
                     self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
 
     def _booking_trigger_loop(self):
-        self._log("Trigger loop started.")
+        self._log("Pub/Sub Trigger loop started.")
+        channel_to_routing_key = {}
+        for apt in self.m_cfg.appointment_types:
+            channel = self._get_redis_key(apt.routing_key)
+            channel_to_routing_key[channel] = apt.routing_key
+            
+        if not channel_to_routing_key:
+            self._log("No appointment types configured. Exiting trigger loop.")
+            return
+            
+        pubsub = None
         while not self.m_stop_event.is_set():
             try:
-                time.sleep(1.0)
+                if pubsub is None:
+                    pubsub = self.redis_client.pubsub(ignore_subscribe_messages=False)
+                    channels_to_sub = list(channel_to_routing_key.keys())
+                    self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
+                    pubsub.subscribe(*channels_to_sub)
+                message = pubsub.get_message(timeout=5.0)
+                if not message:
+                    continue
+                channel = message['channel']
+                if isinstance(channel, bytes):
+                    channel = channel.decode('utf-8')
+                if message['type'] == 'subscribe':
+                    active_subs = message['data']
+                    self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})")
+                    continue
+                if message['type'] != 'message':
+                    continue
+                raw_data = message['data']
+                if isinstance(raw_data, bytes):
+                    raw_data = raw_data.decode('utf-8')
+                routing_key = channel_to_routing_key.get(channel)
+                if not routing_key:
+                    continue
+                try:
+                    data = json.loads(raw_data)
+                    query_result = VSQueryResult.model_validate(data['query_result'])
+                    query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
+                except Exception as parse_err:
+                    self._log(f"Data parsing error for channel {channel}: {parse_err}")
+                    continue
                 now = time.time()
-                for apt_type in self.m_cfg.appointment_types:
-                    redis_key = self._get_redis_key(apt_type.routing_key)
-                    raw_data = self.redis_client.get(redis_key)
-                    if not raw_data:
-                        continue
-                    
-                    try:
-                        data = json.loads(raw_data)
-                        query_result = VSQueryResult.model_validate(data['query_result'])
-                        query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
-                    except Exception as parse_err:
-                        self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
-                        self.redis_client.delete(redis_key)
-                        continue
-                    
-                    matching_tasks = []
-                    with self.m_lock:
-                        for task in self.m_tasks:
-                            if now < task.next_run or not task.book_allowed:
-                                continue
-                            if apt_type.routing_key not in task.acceptable_routing_keys:
-                                continue
-                            
-                            self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
-                            task.next_run = now + self.m_cfg.booker.booking_cooldown
-                            matching_tasks.append(task)
-                    
-                    if matching_tasks:
-                        threads = []
-                        for task in matching_tasks:
-                            self._log(f"🚀 Triggering BOOK for {apt_type.routing_key}")
-                            t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
-                            threads.append(t)
-                            t.start()
-                        
-                        for t in threads:
-                            t.join() 
+                matching_tasks = []
+                with self.m_lock:
+                    for task in self.m_tasks:
+                        if now < task.next_run or not task.book_allowed:
+                            continue
+                        if routing_key not in task.acceptable_routing_keys:
+                            continue   
+                        task.next_run = now + self.m_cfg.booker.booking_cooldown
+                        matching_tasks.append(task)
+                if matching_tasks:
+                    for task in matching_tasks:
+                        self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}")
+                        t = threading.Thread(
+                            target=self._execute_book_job, 
+                            args=(task, query_result), 
+                            daemon=True
+                        )
+                        t.start()
             except Exception as e:
-                self._log(f"Trigger loop error: {e}")
+                self._log(f"Trigger loop pub/sub error: {e}")
+                if pubsub:
+                    try:
+                        pubsub.close()
+                    except:
+                        pass
+                    pubsub = None
                 time.sleep(2)
+        if pubsub:
+            pubsub.close()
+            self._log("Pub/Sub connection closed.")
 
     def _execute_book_job(self, task: Task, query_result: VSQueryResult):
         queue_name = f"auto.{query_result.apt_type.routing_key}"
@@ -183,7 +212,7 @@ class BuiltinBookerGCO:
         is_rate_limited = False
         
         try:
-            task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
+            task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name, test=False)
             if not task_data:
                 return 
             task_id = task_data['id']
@@ -289,13 +318,13 @@ class BuiltinBookerGCO:
                 plg_cfg.session_max_life = self.m_cfg.session_max_life
 
                 if self.m_cfg.need_account:
-                    acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd)
+                    acc = VSCloudApi.Instance().get_next_account(self.m_cfg.booker.account_pool_id, self.m_cfg.booker.account_cd, test=False)
                     plg_cfg.account.id = acc['id']
                     plg_cfg.account.username = acc['username']
                     plg_cfg.account.password = acc['password']
 
                 if self.m_cfg.need_proxy:
-                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
+                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False)
                     plg_cfg.proxy.id = proxy['id']
                     plg_cfg.proxy.ip = proxy['ip']
                     plg_cfg.proxy.port = proxy['port']

+ 158 - 71
booker_order.py

@@ -4,7 +4,7 @@ import json
 import threading
 import random
 import redis
-from typing import List, Dict, Callable
+from typing import List, Dict, Callable, Any, Optional
 
 from vs_types import GroupConfig, VSPlgConfig, Task, VSQueryResult, AppointmentType
 from vs_plg_factory import VSPlgFactory 
@@ -27,13 +27,15 @@ class OrderBookerGCO:
         self.m_lock = threading.RLock()
         self.m_stop_event = threading.Event()
         self.redis_client = redis.Redis(**redis_conf)
-        self.m_pending_order_by_queue: Dict[str, int] = {}
+        self.m_pending_order_by_queue: Dict[str, int] = {}        
+        self.m_last_spawn_times: Dict[str, float] = {}
+        self.m_task_data_cache: Dict[str, dict] = {}
         
         self.m_tracker_key = f"vs:worker:tasks_tracker:{self.m_cfg.identifier}"
         self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
         self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
-        self.m_last_spawn_time = 0.0
-        self.heartbeat_ttl = 300
+        self.task_backoff = ExponentialBackoff(base_delay=10, max_delay=30*60.0, factor=2.0)
+        self.heartbeat_ttl = 2*60.0
 
     def _log(self, message):
         if self.m_logger:
@@ -51,6 +53,7 @@ class OrderBookerGCO:
         threading.Thread(target=self._booking_trigger_loop, daemon=True).start()
         threading.Thread(target=self._creator_loop, daemon=True).start()
         threading.Thread(target=self._maintain_loop, daemon=True).start()
+        threading.Thread(target=self._cache_refresh_loop, daemon=True).start()
 
     def stop(self):
         self._log("Stopping Booker...")
@@ -72,6 +75,9 @@ class OrderBookerGCO:
             if task in self.m_tasks:
                 self.m_tasks.remove(task)
                 removed = True
+            task_id = str(getattr(task, 'task_ref', ''))
+            self.m_task_data_cache.pop(task_id, None)
+                
         if cleanup and removed:
             self._cleanup_task(task, reason)
         return removed
@@ -80,6 +86,7 @@ class OrderBookerGCO:
         with self.m_lock:
             tasks = list(self.m_tasks)
             self.m_tasks.clear()
+            self.m_task_data_cache.clear()
         for task in tasks:
             self._cleanup_task(task, reason)
 
@@ -157,58 +164,123 @@ class OrderBookerGCO:
                 with self.m_lock:
                     self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
 
+    def _cache_refresh_loop(self):
+        self._log("Cache refresh loop started.")
+        refresh_interval = 15*60
+        
+        while not self.m_stop_event.is_set():
+            for _ in range(refresh_interval):
+                if self.m_stop_event.is_set():
+                    return
+                time.sleep(1.0)
+            with self.m_lock:
+                task_ids = list(self.m_task_data_cache.keys())
+            if not task_ids:
+                continue
+            for tid in task_ids:
+                if self.m_stop_event.is_set():
+                    break
+                try:
+                    fresh_data = VSCloudApi.Instance().get_vas_task(tid)
+                    if fresh_data:
+                        with self.m_lock:
+                            if tid in self.m_task_data_cache:
+                                self.m_task_data_cache[tid] = fresh_data
+                except Exception:
+                    pass
+                time.sleep(0.5)
+
     def _booking_trigger_loop(self):
-        self._log("Trigger loop started.")
+        self._log("Pub/Sub Trigger loop started.")
+        channel_to_routing_key = {}
+        for apt in self.m_cfg.appointment_types:
+            channel = self._get_redis_key(apt.routing_key)
+            channel_to_routing_key[channel] = apt.routing_key
+            
+        if not channel_to_routing_key:
+            self._log("No appointment types configured. Exiting trigger loop.")
+            return
+            
+        pubsub = None
         while not self.m_stop_event.is_set():
             try:
-                time.sleep(1.0)
+                if pubsub is None:
+                    pubsub = self.redis_client.pubsub(ignore_subscribe_messages=False)
+                    channels_to_sub = list(channel_to_routing_key.keys())
+                    self._log(f"⏳ Sending SUBSCRIBE command to Redis for: {channels_to_sub}")
+                    pubsub.subscribe(*channels_to_sub)
+                message = pubsub.get_message(timeout=5.0)
+                if not message:
+                    continue
+                channel = message['channel']
+                if isinstance(channel, bytes):
+                    channel = channel.decode('utf-8')
+                if message['type'] == 'subscribe':
+                    active_subs = message['data']
+                    self._log(f"📡 [Redis ACK] Successfully subscribed to: {channel} (Active connection subs: {active_subs})")
+                    continue
+                if message['type'] != 'message':
+                    continue
+                raw_data = message['data']
+                if isinstance(raw_data, bytes):
+                    raw_data = raw_data.decode('utf-8')
+                routing_key = channel_to_routing_key.get(channel)
+                if not routing_key:
+                    continue
+                try:
+                    data = json.loads(raw_data)
+                    query_result = VSQueryResult.model_validate(data['query_result'])
+                    query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
+                except Exception as parse_err:
+                    self._log(f"Data parsing error for channel {channel}: {parse_err}")
+                    continue
                 now = time.time()
-                for apt_type in self.m_cfg.appointment_types:
-                    redis_key = self._get_redis_key(apt_type.routing_key)
-                    raw_data = self.redis_client.get(redis_key)
-                    if not raw_data:
-                        continue
-                    try:
-                        data = json.loads(raw_data)
-                        query_result = VSQueryResult.model_validate(data['query_result'])
-                        query_result.apt_type = AppointmentType.model_validate(data['apt_type'])
-                    except Exception as parse_err:
-                        self._log(f"Data parsing error for {redis_key}: {parse_err}. Deleting corrupted signal.")
-                        self.redis_client.delete(redis_key)
-                        continue
-                    
-                    matching_tasks = []
-                    with self.m_lock:
-                        for task in self.m_tasks:
-                            if now < task.next_run or not task.book_allowed:
-                                continue
-                            if apt_type.routing_key not in task.acceptable_routing_keys:
-                                continue
-                            
-                            task.next_run = now + self.m_cfg.booker.booking_cooldown
-                            matching_tasks.append(task)
-                            
-                    if matching_tasks:
-                        threads = []
-                        for task in matching_tasks:
-                            self._log(f"🚀 Triggering BOOK for {apt_type.routing_key} | Order Ref: {task.task_ref}")
-                            t = threading.Thread(target=self._execute_book_job, args=(task, query_result))
-                            threads.append(t)
-                            t.start()
-                        
-                        for t in threads:
-                            t.join() 
-                    
+                matching_tasks = []
+                with self.m_lock:
+                    for task in self.m_tasks:
+                        if now < task.next_run or not task.book_allowed:
+                            continue
+                        if routing_key not in task.acceptable_routing_keys:
+                            continue   
+                        task.next_run = now + self.m_cfg.booker.booking_cooldown
+                        matching_tasks.append(task)
+                if matching_tasks:
+                    for task in matching_tasks:
+                        self._log(f"🚀 Triggering BOOK for {routing_key} | Order Ref: {task.task_ref}")
+                        t = threading.Thread(
+                            target=self._execute_book_job, 
+                            args=(task, query_result), 
+                            daemon=True
+                        )
+                        t.start()
             except Exception as e:
-                self._log(f"Trigger loop error: {e}")
+                self._log(f"Trigger loop pub/sub error: {e}")
+                if pubsub:
+                    try:
+                        pubsub.close()
+                    except:
+                        pass
+                    pubsub = None
                 time.sleep(2)
+        if pubsub:
+            pubsub.close()
+            self._log("Pub/Sub connection closed.")
 
     def _execute_book_job(self, task: Task, query_result: VSQueryResult):
         task_id = task.task_ref
         task_data = None
 
         try:
-            task_data = VSCloudApi.Instance().get_vas_task(task_id)
+            with self.m_lock:
+                task_data = self.m_task_data_cache.get(str(task_id))
+            
+            if not task_data:
+                self._log(f"Cache miss for {task_id}, fetching from cloud...")
+                task_data = VSCloudApi.Instance().get_vas_task(str(task_id))
+                if task_data:
+                    with self.m_lock:
+                        self.m_task_data_cache[str(task_id)] = task_data
+
             if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
                 self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
                 self._remove_task(task, "bound task no longer valid")
@@ -230,19 +302,27 @@ class OrderBookerGCO:
                     "timestamp": int(time.time()),
                     "payment_link": book_res.payment_link
                 }
-                VSCloudApi.Instance().update_vas_task(task_id, {"status": "grabbed", "grabbed_history": grab_info})
-                push_content = (
-                    f"🎉 【预定成功通知】\n"
-                    f"━━━━━━━━━━━━━━━\n"
-                    f"订单编号: {order_id}\n"
-                    f"预约账号: {book_res.account}\n"
-                    f"预约日期: {book_res.book_date}\n"
-                    f"预约时间: {book_res.book_time}\n"
-                    f"预约编号: {book_res.urn}\n"
-                    f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
-                    f"━━━━━━━━━━━━━━━\n"
-                )
-                VSCloudApi.Instance().push_weixin_text(push_content)
+                
+                def _update_cloud_success():
+                    try:
+                        VSCloudApi.Instance().update_vas_task(str(task_id), {"status": "grabbed", "grabbed_history": grab_info})
+                        push_content = (
+                            f"🎉 【预定成功通知】\n"
+                            f"━━━━━━━━━━━━━━━\n"
+                            f"订单编号: {order_id}\n"
+                            f"预约账号: {book_res.account}\n"
+                            f"预约日期: {book_res.book_date}\n"
+                            f"预约时间: {book_res.book_time}\n"
+                            f"预约编号: {book_res.urn}\n"
+                            f"支付链接: {book_res.payment_link if book_res.payment_link else '无需支付/暂无'}\n"
+                            f"━━━━━━━━━━━━━━━\n"
+                        )
+                        VSCloudApi.Instance().push_weixin_text(push_content)
+                    except Exception as e:
+                        self._log(f"Failed to update success state to cloud: {e}")
+                
+                ThreadPool.getInstance().enqueue(_update_cloud_success)
+
                 self.redis_client.zrem(self.m_tracker_key, task_id)
                 self._remove_task(task, "booking success")
             else:
@@ -262,11 +342,12 @@ class OrderBookerGCO:
                     t_fails = task_meta.get('booking_failures', 0) + 1
                     task_meta['booking_failures'] = t_fails
                     
-                    try:
-                        VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
-                    except Exception as cloud_err:
-                        self._log(f"Failed to update task meta: {cloud_err}")
-                        
+                    def _update_cloud_meta():
+                        try:
+                            VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta})
+                        except Exception as cloud_err:
+                            self._log(f"Failed to update task meta: {cloud_err}")
+                    ThreadPool.getInstance().enqueue(_update_cloud_meta)   
                     t_cd = self.task_backoff.calculate(t_fails)
                     self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
                     self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
@@ -276,6 +357,7 @@ class OrderBookerGCO:
         spawn_interval = 10.0
         while not self.m_stop_event.is_set():
             time.sleep(2.0)
+            now = time.time()
             for apt in self.m_cfg.appointment_types:
                 r_key = apt.routing_key
                 
@@ -289,11 +371,10 @@ class OrderBookerGCO:
                     target = self.m_cfg.booker.target_instances
                 
                 if (active + pending) < target:
-                    now = time.time()
-                    if now - self.m_last_spawn_time >= spawn_interval:
-                        self.m_last_spawn_time = now 
+                    last_spawn = self.m_last_spawn_times.get(r_key, 0.0)
+                    if now - last_spawn >= spawn_interval:
+                        self.m_last_spawn_times[r_key] = now 
                         self._spawn_worker(r_key)
-                        break
 
     def _spawn_worker(self, target_routing_key: str):
         with self.m_lock: 
@@ -306,13 +387,16 @@ class OrderBookerGCO:
             
             try:
                 queue_name = f"auto.{target_routing_key}"
-                task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
+                task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name, test=False)
                 if not task_data:
                     return 
                 
                 task_id = task_data['id']
                 
-                self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
+                with self.m_lock:
+                    self.m_task_data_cache[str(task_id)] = task_data
+                
+                self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + 5*60.0})
                 user_inputs = task_data.get('user_inputs', {})
                 
                 plg_cfg = VSPlgConfig()
@@ -326,7 +410,7 @@ class OrderBookerGCO:
                 
                 acceptable_keys = [target_routing_key]
                 if self.m_cfg.need_proxy:
-                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
+                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False)
                     plg_cfg.proxy.id = proxy['id']
                     plg_cfg.proxy.ip = proxy['ip']
                     plg_cfg.proxy.port = proxy['port']
@@ -386,7 +470,7 @@ class OrderBookerGCO:
                         task_meta['spawn_failures'] = t_fails
                         
                         try:
-                            VSCloudApi.Instance().update_vas_task(task_id, {"meta": task_meta})
+                            VSCloudApi.Instance().update_vas_task(str(task_id), {"meta": task_meta})
                         except Exception as cloud_err:
                             self._log(f"Failed to update task meta: {cloud_err}")
                         
@@ -397,8 +481,11 @@ class OrderBookerGCO:
                 with self.m_lock: 
                     self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
                 
-                # 创建/登录失败,调用安全归还函数
                 if not success and task_id is not None and not is_rate_limited:
                     self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
                     self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
+                    
+                    with self.m_lock:
+                        self.m_task_data_cache.pop(str(task_id), None)
+                        
         ThreadPool.getInstance().enqueue(_job)

+ 8 - 2
config/config.json

@@ -981,7 +981,7 @@
                 "account_source": "built-in",
                 "account_pool_id": "tls.gb.fr.sentinel",
                 "target_instances": 1,
-                "account_cd": 300,
+                "account_cd": 1800,
                 "signal_ttl": 30
             },
             "booker": {
@@ -1016,7 +1016,13 @@
             "free_config": {
                 "tls_url": "https://visas-fr.tlscontact.com/en-us/country/gb/vac/gbLON2fr",
                 "location": "London",
-                "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A"
+                "capsolver_key": "CAP-5441DD341DD3CC2FAEF0BE6FE493EE9A",
+                "login_captcha": {
+                    "solve_advance": true,
+                    "site_key": "6LcDpXcfAAAAAM7wOEsF_38DNsL20tTvPTKxpyn0",
+                    "page_url": "https://i2-auth.visas-fr.tlscontact.com",
+                    "task": "ReCaptchaV2TaskProxyLess"
+                }
             }
         },
         {


+ 2 - 1
main_booker.py

@@ -22,7 +22,8 @@ def main():
     parser.add_argument(
         "-c", "--config",
         type=str,
-        required=True,
+        required=False,
+        default="config/config.json",
         help="Path to config.json"
     )
 

+ 2 - 1
main_sentinel.py

@@ -20,7 +20,8 @@ def main():
     parser.add_argument(
         "-c", "--config",
         type=str,
-        required=True,
+        required=False,
+        default="config/config.json",
         help="Path to config.json"
     )
 

+ 1 - 2
main_sweeper.py

@@ -20,8 +20,7 @@ class OrphanTaskSweeper:
             except Exception as e:
                 logging.error(f"Sweeper loop error: {e}")
             
-            # 每隔 60 秒巡检一次全局 Redis
-            time.sleep(60)
+            time.sleep(3)
 
     def _sweep(self):
         # 扫描集群中所有 Worker 的 tracking keys

+ 36 - 11
plugins/tls_plugin.py

@@ -9,9 +9,9 @@ import socket
 from datetime import datetime
 from typing import List, Dict, Optional, Any, Callable
 from urllib.parse import urljoin, urlparse, urlencode, parse_qs
+from concurrent.futures import ThreadPoolExecutor
 
 from DrissionPage import ChromiumPage, ChromiumOptions
-
 from vs_plg import IVSPlg
 from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError 
 from utils.cloudflare_bypass_for_scraping import CloudflareBypasser
@@ -128,6 +128,21 @@ class TlsPlugin(IVSPlg):
         全浏览器会话创建:过盾 -> JS注入登录 -> 状态机自动路由导航 -> 到达目标页
         """
         self._log(f"Initializing Session (ID: {self.instance_id})...")
+        captcha_future = None
+        captcha_executor = ThreadPoolExecutor(max_workers=1)
+        login_captcha_cfg = self.free_config.get("login_captcha", {})
+        if login_captcha_cfg.get('solve_advance'):
+            login_page = login_captcha_cfg.get("page_url")
+            site_key = login_captcha_cfg.get("site_key")
+            task_type = login_captcha_cfg.get("task")
+            self._log(f"🚀 Early starting background Captcha solve for sitekey={site_key}")
+            rc_params = {
+                "type": task_type,
+                "page": login_page,
+                "siteKey": site_key, 
+                "apiToken": self.free_config.get("capsolver_key", "")
+            }
+            captcha_future = captcha_executor.submit(self._solve_recaptcha, rc_params)
         co = ChromiumOptions()
         
         def get_free_port():
@@ -159,7 +174,6 @@ class TlsPlugin(IVSPlg):
         else:
             self._log("[WARN] No proxy configured!")
 
-
         specific_fp = FingerprintGenerator().generate(self.config.account.username)
         fp_seed = specific_fp.get("seed")
         fp_platform = specific_fp.get("platform")
@@ -301,7 +315,7 @@ class TlsPlugin(IVSPlg):
                     self._log("State: Login Form. Processing credentials and Captcha...")
                     
                     recaptchav2_token = ""
-                    if self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]'):
+                    if not captcha_future and (self.page.ele('.g-recaptcha') or self.page.ele('xpath://iframe[contains(@src, "recaptcha")]')):
                         rec_iframe = self.page.ele('xpath://iframe[contains(@src, "recaptcha")]')
                         rec_iframe_src = rec_iframe.attr('src')
                         rec_parsed = urlparse(rec_iframe_src)
@@ -310,14 +324,14 @@ class TlsPlugin(IVSPlg):
                         rec_size = rec_params.get("size", [None])[0]
                         
                         if 'normal' == rec_size:
-                            self._log(f"Solving ReCaptcha sitekey={rec_sitekey}...")
+                            self._log(f"Found dynamic sitekey={rec_sitekey}. Starting async Captcha solver...")
                             rc_params = {
                                 "type": "ReCaptchaV2TaskProxyLess",
                                 "page": current_url,
                                 "siteKey": rec_sitekey, 
                                 "apiToken": self.free_config.get("capsolver_key", "")
                             }
-                            recaptchav2_token = self._solve_recaptcha(rc_params)
+                            captcha_future = captcha_executor.submit(self._solve_recaptcha, rc_params)
 
                     username = self.config.account.username
                     password = self.config.account.password
@@ -334,10 +348,20 @@ class TlsPlugin(IVSPlg):
                     self.keyboard.type_text(password, humanize=True)
                     
                     # 注入 Token
-                    if recaptchav2_token:
-                        inject_js = f"var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = '{recaptchav2_token}'; }}"
-                        self.page.run_js(inject_js)
-                        time.sleep(random.uniform(0.5, 1.0))
+                    if captcha_future:
+                        self._log("Waiting for background Captcha result...")
+                        try:
+                            # 设一个合理的超时,防止死锁
+                            recaptchav2_token = captcha_future.result(timeout=120) 
+                            self._log("Background Captcha solved successfully!")
+                        except Exception as e:
+                            raise BizLogicError(f"Captcha solving failed or timed out: {e}")
+                    
+                        # 注入 Token
+                        if recaptchav2_token:
+                            inject_js = f"var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = '{recaptchav2_token}'; }}"
+                            self.page.run_js(inject_js)
+                            time.sleep(random.uniform(0.5, 1.0))
                     
                     self._log("Submitting Login...")
                     login_btn = self.page.ele('tag:button@@text():Login')
@@ -849,7 +873,8 @@ class TlsPlugin(IVSPlg):
     def _solve_recaptcha(self, params) -> str:
         """调用 VSCloudApi 解决 ReCaptcha"""
         key = params.get("apiToken")
-        if not key: raise NotFoundError("Api-token required")
+        if not key:
+            raise NotFoundError("Api-token required")
         
         submit_url = "https://api.capsolver.com/createTask"
         task = {
@@ -870,7 +895,7 @@ class TlsPlugin(IVSPlg):
                 task["proxyPassword"] = p.password
         
         payload = {"clientKey": key, "task": task}
-        import requests as req # 局部引用,避免混淆
+        import requests as req
         r = req.post(submit_url, json=payload, timeout=20)
         if r.status_code != 200:
             raise BizLogicError(message="Failed to submit capsolver task")

+ 0 - 1158
plugins/tls_plugin2.py

@@ -1,1158 +0,0 @@
-import time
-import json
-import random
-import re
-import os
-import uuid
-import shutil
-import queue
-import threading
-from datetime import datetime
-from typing import List, Dict, Optional, Any, Callable
-from urllib.parse import urljoin, urlparse, urlencode, parse_qs
-
-from camoufox import NewBrowser
-from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError, Page, BrowserContext
-
-from vs_plg import IVSPlg
-from vs_types import VSPlgConfig, AppointmentType, VSQueryResult, VSBookResult, AvailabilityStatus, TimeSlot, DateAvailability, NotFoundError, PermissionDeniedError, RateLimiteddError, SessionExpiredOrInvalidError, BizLogicError 
-from utils.cloudflare_bypass_for_scraping2 import CloudflareBypasser
-
-
-def _camoufox_headless_from_env():
-    """
-    Ubuntu/无显示器 下通过环境变量选择 Camoufox 模式(与 NewBrowser 一致):
-    - 未设置 / 0 / false:有头(需真实 DISPLAY 或自行开 Xvfb 并 export DISPLAY=:99)
-    - 1 / true / yes / headless:Playwright 真无头(无需 X)
-    - virtual / xvfb:由 Camoufox 起 Xvfb 虚拟屏(需安装 Xvfb,适合要「有界面栈」又无可接显示器的 Linux)
-    """
-    v = (os.environ.get("CAMOUFOX_HEADLESS") or "").strip().lower()
-    if v in ("1", "true", "yes", "headless"):
-        return True
-    if v in ("virtual", "xvfb", "vdisplay"):
-        return "virtual"
-    return False
-
-
-class BrowserResponse:
-    """模拟 requests.Response"""
-    def __init__(self, result_dict):
-        result_dict = result_dict or {}
-        self.status_code = result_dict.get('status', 0)
-        self.text = result_dict.get('body', '')
-        self.headers = result_dict.get('headers', {})
-        self.url = result_dict.get('url', '')
-        self._json = None
-
-    def json(self):
-        if self._json is None:
-            if not self.text:
-                return {}
-            try:
-                self._json = json.loads(self.text)
-            except:
-                self._json = {}
-        return self._json
-
-class TlsPlugin(IVSPlg):
-    """
-    TLSContact 签证预约插件 (Camoufox 版)
-    """
-
-    def __init__(self, group_id: str):
-        self.group_id = group_id
-        self.config: Optional[VSPlgConfig] = None
-        self.free_config: Dict[str, Any] = {}
-        self.is_healthy = True
-        self.logger = None
-        
-        self.page: Optional[Page] = None
-        self.browser_ctx: Optional[BrowserContext] = None
-        self.playwright = None
-        self.travel_group: Optional[Dict] = None
-        
-        self.instance_id = uuid.uuid4().hex[:8]
-        self.root_workspace = os.path.abspath(os.path.join("data/temp_browser_data", f"{self.group_id}.{self.instance_id}"))
-        self.user_data_path = os.path.join(self.root_workspace, "user_data")
-    
-        if not os.path.exists(self.root_workspace):
-            os.makedirs(self.root_workspace)
-            
-        self.session_create_time: float = 0
-
-        # Playwright/Camoufox 的 Page 只能在创建它的线程使用;Sentinel 在线程池里建会话、在监控线程里 query。
-        # 用单条工作线程串行所有浏览器操作,避免跨线程卡死或 silent health_check 失败。
-        self._pw_cmd_queue: "queue.Queue[Optional[Callable[[], None]]]" = queue.Queue()
-        self._pw_thread: Optional[threading.Thread] = None
-        self._pw_worker: Optional[threading.Thread] = None
-        self._pw_thread_ready = threading.Event()
-        self._pw_thread_lock = threading.Lock()
-
-    def get_group_id(self) -> str:
-        return self.group_id
-    
-    def set_log(self, logger: Callable[[str], None]):
-        self.logger = logger
-    
-    def _log(self, message):
-        if self.logger:
-            self.logger(f'[TlsPlugin] [{self.group_id}] {message}')
-        else:
-            print(f'[TlsPlugin] [{self.group_id}] {message}')
-
-    def set_config(self, config: VSPlgConfig):
-        self.config = config
-        self.free_config = config.free_config or {}
-
-    def _ensure_pw_thread(self):
-        with self._pw_thread_lock:
-            if self._pw_thread and self._pw_thread.is_alive():
-                return
-            self._pw_thread_ready.clear()
-            t = threading.Thread(target=self._pw_loop, name=f"camoufox-tls-{self.instance_id}", daemon=True)
-            self._pw_thread = t
-            t.start()
-            if not self._pw_thread_ready.wait(timeout=60):
-                raise BizLogicError("Camoufox worker thread failed to start")
-
-    def _pw_loop(self):
-        self._pw_worker = threading.current_thread()
-        self._pw_thread_ready.set()
-        while True:
-            work = self._pw_cmd_queue.get()
-            if work is None:
-                break
-            work()
-
-    def _run_on_pw_thread(self, fn, *args, **kwargs):
-        if self._pw_worker is not None and threading.current_thread() is self._pw_worker:
-            return fn(*args, **kwargs)
-        if self._pw_worker is None or not self._pw_thread or not self._pw_thread.is_alive():
-            self._ensure_pw_thread()
-        out: List[Any] = [None, None]
-        done = threading.Event()
-
-        def work():
-            try:
-                out[1] = fn(*args, **kwargs)
-            except BaseException as e:
-                out[0] = e
-            finally:
-                done.set()
-
-        self._pw_cmd_queue.put(work)
-        if not done.wait(timeout=600):
-            self._log("Browser thread operation timed out (600s).")
-            raise BizLogicError("Browser thread operation timeout")
-        if out[0] is not None:
-            raise out[0]
-        return out[1]
-
-    def _stop_pw_thread(self):
-        with self._pw_thread_lock:
-            t = self._pw_thread
-            if not t or not t.is_alive():
-                self._pw_thread = None
-                self._pw_worker = None
-                return
-            self._pw_cmd_queue.put(None)
-        t.join(timeout=20)
-        with self._pw_thread_lock:
-            self._pw_thread = None
-            self._pw_worker = None
-        
-    def keep_alive(self):
-        if self.page is None:
-            return
-        def _work():
-            try:
-                resp = self._perform_request("GET", self.page.url, retry_count=1)
-                self._check_page_is_session_expired_or_invalid('Book your appointment', html = resp.text)
-            except SessionExpiredOrInvalidError as e:
-                self.is_healthy = False
-            except Exception as e:
-                pass
-        try:
-            self._run_on_pw_thread(_work)
-        except Exception:
-            pass
-
-    def _health_check_impl(self) -> bool:
-        if not self.is_healthy:
-            return False
-        if self.page is None:
-            return False
-        try:
-            v = self.page.evaluate("1")
-            if v != 1:
-                return False
-        except:
-            return False
-        if self.config.session_max_life > 0:
-            current_time = time.time()
-            elapsed_time = current_time - self.session_create_time
-            if elapsed_time > self.config.session_max_life:
-                self._log(f"Session expired.")
-                return False
-        return True
-
-    def health_check(self) -> bool:
-        if not self.is_healthy or self.page is None:
-            return False
-        try:
-            if self._pw_worker is not None and threading.current_thread() is self._pw_worker:
-                return self._health_check_impl()
-            return self._run_on_pw_thread(self._health_check_impl)
-        except Exception:
-            return False
-    
-    def _save_screenshot(self, name_prefix):
-        try:
-            timestamp = int(time.time())
-            filename = f"{self.instance_id}_{name_prefix}_{timestamp}.jpg"
-            save_path = os.path.join("data", filename)
-            os.makedirs("data", exist_ok=True)
-            self.page.screenshot(path=save_path, full_page=True)
-            self._log(f"Screenshot saved to {save_path}")
-        except Exception as e:
-            self._log(f"Failed to save screenshot: {e}")
-
-    def create_session(self):
-        self._ensure_pw_thread()
-        try:
-            self._run_on_pw_thread(self._create_session_inner)
-        except Exception:
-            self._stop_pw_thread()
-            raise
-
-    def _create_session_inner(self):
-        """
-        全浏览器会话创建:过盾 -> JS注入登录 -> 状态机自动路由导航 -> 到达目标页
-        必须在同一条 Camoufox/Playwright 工作线程中执行(Playwright 非线程安全)。
-        """
-        self._log(f"Initializing Session (ID: {self.instance_id})...")
-        proxy_cfg = None
-        if self.config.proxy and self.config.proxy.ip:
-            p = self.config.proxy
-            if p.username and p.password:
-                proxy_cfg = {
-                    "server": f"{p.proto}://{p.ip}:{p.port}",
-                    "username": p.username,
-                    "password": p.password,
-                }
-            else:
-                proxy_cfg = {"server": f"{p.proto}://{p.ip}:{p.port}"}
-        else:
-            self._log("[WARN] No proxy configured!")
-
-        try:
-            self.playwright = sync_playwright().start()
-            headless_opt = _camoufox_headless_from_env()
-            self._log(f"Camoufox headless={headless_opt!r} (env CAMOUFOX_HEADLESS)")
-            self.browser_ctx = NewBrowser(
-                self.playwright,
-                persistent_context=True,
-                headless=headless_opt,
-                user_data_dir=self.user_data_path,
-                proxy=proxy_cfg,
-                window=(1920, 1080),
-            )
-            self.page = self.browser_ctx.pages[0] if self.browser_ctx.pages else self.browser_ctx.new_page()
-            
-            # --- 初始化访问与过盾 ---
-            tls_url = self.free_config.get('tls_url', '')
-            self._log(f"Navigating: {tls_url}")
-            self.page.goto(tls_url, wait_until="domcontentloaded")
-            time.sleep(5)
-            
-            cf_bypasser = CloudflareBypasser(self.page, log=True)
-            if not cf_bypasser.bypass(max_retry=15):
-                raise BizLogicError("Cloudflare bypass timeout")
-            time.sleep(3)
-            cf_bypasser.handle_waiting_room()
-            
-            # --- 状态机导航循环 ---
-            max_steps = 20
-            session_created = False
-            has_submitted_login = False
-            
-            for step in range(max_steps):
-                current_url = self.page.url
-                self._log(f"--- [Router Step {step+1}] Current URL: {current_url} ---")
-                
-                # 状态 1:到达终极目标页面 (成功退出条件)
-                if "appointment-booking" in current_url or self.page.locator("button:has-text('Book your appointment')").first.count():
-                    btn_selector = "button:has-text('Book your appointment')"
-                    if self._is_selector_visible(btn_selector, timeout=10000):
-                        self.session_create_time = time.time()
-                        self._log("✅ Login & Navigation Success! Reached appointment-booking.")
-                        session_created = True
-                        break
-                
-                # 状态 2:遇到没有申请人的拦截页 (致命错误退出条件)
-                page_content = self.page.content()
-                no_applicant_indicators = [
-                    "Add a new applicant" in page_content,
-                    "You have not yet added an applicant" in page_content,
-                    "applicants-information" in current_url
-                ]
-                if any(no_applicant_indicators):
-                    raise BizLogicError(message="No applicant added. Cannot proceed to booking.")
-                
-                # 状态 3:首页/登录入口页 -> 需要点击进入登录
-                if self.page.locator("a[href*='login']").first.count() and not self.page.locator("label:has-text('Email')").first.count():
-                    self._log("State: Login Portal. Clicking login link...")
-                    try:
-                        self.page.locator("a[href*='login']").first.click(timeout=5000)
-                        time.sleep(3)
-                        continue
-                    except Exception:
-                        pass
-                
-                # 状态 4:真正的登录表单页
-                if self.page.locator("label:has-text('Email')").first.count() and not has_submitted_login:
-                    self._log("State: Login Form. Processing credentials and Captcha...")
-                    
-                    recaptchav2_token = ""
-                    if self.page.locator(".g-recaptcha").first.count() or self.page.locator("//iframe[contains(@src, 'recaptcha')]").first.count():
-                        try:
-                            rec_iframe = self.page.locator("//iframe[contains(@src, 'recaptcha')]").first
-                            rec_iframe_src = rec_iframe.get_attribute('src') or ""
-                            rec_parsed = urlparse(rec_iframe_src)
-                            rec_params = parse_qs(rec_parsed.query)
-                            rec_sitekey = rec_params.get("k", [None])[0]
-                            rec_size = rec_params.get("size", [None])[0]
-                            
-                            if 'normal' == rec_size and rec_sitekey:
-                                self._log(f"Solving ReCaptcha sitekey={rec_sitekey}...")
-                                rc_params = {
-                                    "type": "ReCaptchaV2TaskProxyLess",
-                                    "page": current_url,
-                                    "siteKey": rec_sitekey,
-                                    "apiToken": self.free_config.get("capsolver_key", "")
-                                }
-                                recaptchav2_token = self._solve_recaptcha(rc_params)
-                        except Exception as e:
-                            self._log(f"ReCaptcha extraction failed: {e}")
-                    
-                    username = self.config.account.username
-                    password = self.config.account.password
-                    
-                    self._type_into_first_visible(
-                        selectors=[
-                            "input[name='email']",
-                            "input[type='email']",
-                            "input#email",
-                            "input[autocomplete='username']",
-                            "label:has-text('Email') + input",
-                        ],
-                        text=username,
-                        field_name="Email",
-                    )
-                    time.sleep(random.uniform(0.5, 1.2))
-                    
-                    self._type_into_first_visible(
-                        selectors=[
-                            "input[name='password']",
-                            "input[type='password']",
-                            "input#password",
-                            "input[autocomplete='current-password']",
-                            "label:has-text('Password') + input",
-                        ],
-                        text=password,
-                        field_name="Password",
-                    )
-                    
-                    # 注入 Token
-                    if recaptchav2_token:
-                        inject_js = f"var g = document.getElementById('g-recaptcha-response'); if(g) {{ g.value = '{recaptchav2_token}'; }}"
-                        try:
-                            self.page.evaluate(f"() => {{ {inject_js} }}")
-                            self._log("ReCaptcha token injected")
-                        except Exception:
-                            pass
-                        time.sleep(random.uniform(0.5, 1.0))
-                    
-                    self._log("Submitting Login...")
-                    time.sleep(random.uniform(0.3, 0.8))
-                    self.page.locator("button:has-text('Login')").first.click(timeout=10000)
-                    has_submitted_login = True
-                    time.sleep(3)
-                    continue
-                
-                # 状态 5:Travel Groups 页面
-                if "travel-groups" in current_url:
-                    self._log("State: Travel Groups. Selecting targeted group...")
-                    groups = self._parse_travel_groups(self.page.content())
-                    location = self.free_config.get('location')
-                    self.travel_group = next((g for g in groups if location in g['location']), None)
-                    
-                    if not self.travel_group:
-                        self._save_screenshot("group_not_found")
-                        raise NotFoundError(f"Group not found for {location}")
-                    
-                    formgroup_id = self.travel_group.get('group_number')
-                    btn_selector = f'button[name="formGroupId"][value="{formgroup_id}"]'
-                    
-                    select_buttons = self.page.locator(btn_selector)
-                    if select_buttons.count():
-                        # 取最后一个可见的按钮
-                        select_btn = None
-                        for i in range(select_buttons.count() - 1, -1, -1):
-                            btn = select_buttons.nth(i)
-                            try:
-                                if btn.is_visible(timeout=1000):
-                                    select_btn = btn
-                                    break
-                            except Exception:
-                                continue
-                        
-                        if select_btn:
-                            time.sleep(random.uniform(0.5, 1.2))
-                            select_btn.click(timeout=10000)
-                            self._log(f"Clicked select button for group {formgroup_id}")
-                            time.sleep(3)
-                            continue
-                        else:
-                            self._log("[WARN] Select button found but not visible.")
-                    else:
-                        self._log(f"[WARN] Wait timeout for group button {formgroup_id}")
-                
-                # 状态 6:中间过渡页,需点击 "Book Appointment" 继续往下走
-                if self.page.locator('#book-appointment-btn').first.count():
-                    self._log("State: Intermediate Dashboard. Clicking Book Appointment button...")
-                    try:
-                        self.page.locator('#book-appointment-btn').first.click(timeout=10000)
-                        time.sleep(3)
-                        continue
-                    except Exception:
-                        pass
-                
-                # 状态 7:登录失败校验 或 未知加载状态
-                if "login-actions" in current_url and has_submitted_login:
-                    self._log("Waiting on login-actions... (Might be authenticating or invalid credentials)")
-                    time.sleep(2)
-                    try:
-                        if self.page.locator("text='Invalid username or password'").first.count():
-                            raise BizLogicError(message="Login Failed! Invalid credentials or Captcha rejected.")
-                    except Exception:
-                        pass
-                    continue
-                
-                # 兜底:未匹配到明确状态,等待页面渲染或重定向
-                self._log("State: Transitioning or Unknown. Waiting 2 seconds...")
-                time.sleep(2)
-            
-            # 如果循环耗尽还没到达目标
-            if not session_created:
-                raise BizLogicError(f"Failed to reach appointment-booking after {max_steps} navigation steps. Stuck at: {self.page.url}")
-
-        except Exception as e:
-            self._log(f"Session Create Error: {e}")
-            if self.config.debug:
-                self._save_screenshot("create_session_except")
-            self._cleanup_failed_session()
-            raise e
-
-    def query(self, apt_type: AppointmentType) -> VSQueryResult:
-        return self._run_on_pw_thread(self._query_impl, apt_type)
-
-    def _day_block_locator_candidates(self):
-        # 与 Drission 版 `//div[p and div//button[contains(@data-testid, "slot")]]` 对齐(子 div 下含 slot 按钮)
-        yield self.page.locator(
-            "xpath=//div[./p and ./div//button[contains(@data-testid, 'slot')]]"
-        )
-        # 结构略变:任意后代 button 带 slot
-        yield self.page.locator(
-            "xpath=//div[./p and .//button[contains(@data-testid, 'slot                                                                                                           ')]]"
-        )
-        # 仅要求有 p 与 slot 类按钮
-        yield self.page.locator(
-            "xpath=//div[.//p and .//button[contains(@data-testid, 'slot')]]"
-        )
-        # Playwright 原生 :has
-        yield self.page.locator("div").filter(
-            has=self.page.locator("p")
-        ).filter(
-            has=self.page.locator("button[data-testid*='slot']")
-        )
-
-    def _extract_slots_from_calendar_dom(
-        self, target_year: int, target_month_num: int
-    ) -> List[Dict[str, Any]]:
-        """多策略定位「日期块 + 可点时段按钮」,与页面结构差异/Camoufox 兼容。"""
-        all_slots: List[Dict[str, Any]] = []
-        day_blocks = None
-        for loc in self._day_block_locator_candidates():
-            try:
-                n = loc.count()
-            except Exception:
-                continue
-            if n > 0:
-                day_blocks = loc
-                self._log(f"使用日历块选择器,匹配到 {n} 个 day_blocks")
-                break
-        if day_blocks is None:
-            # 不依赖 day_block 外壳:直接扫可用按钮,再向祖先找日期
-            return self._extract_slots_from_available_buttons_only(
-                target_year, target_month_num
-            )
-
-        for i in range(day_blocks.count()):
-            block = day_blocks.nth(i)
-            p_ele = block.locator("p").first 
-            if not p_ele.count():
-                continue
-            day_match = re.search(r"\d+", p_ele.inner_text())
-            if not day_match:
-                continue
-            day_str = day_match.group()
-            try:
-                full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}"
-            except ValueError:
-                continue
-            available_btns = block.locator("button[data-testid^='btn-available-slot']")
-            for j in range(available_btns.count()):
-                btn = available_btns.nth(j)
-                btn_html = btn.inner_html()
-                time_match = re.search(r"\d{2}:\d{2}", btn_html)
-                if not time_match:
-                    continue
-                time_str = time_match.group()
-                test_id = btn.get_attribute("data-testid") or ""
-                if "prime" in test_id and "weekend" in test_id:
-                    lbl = "ptaw"
-                elif "prime" in test_id:
-                    lbl = "pta"
-                else:
-                    lbl = ""
-                all_slots.append(
-                    {"date": full_date, "time": time_str, "label": lbl}
-                )
-        if all_slots:
-            return all_slots
-        return self._extract_slots_from_available_buttons_only(
-            target_year, target_month_num
-        )
-
-    def _extract_slots_from_available_buttons_only(
-        self, target_year: int, target_month_num: int
-    ) -> List[Dict[str, Any]]:
-        """当整块 DOM 选不中时,用可用按钮反查日期行。"""
-        all_slots: List[Dict[str, Any]] = []
-        btns = self.page.locator("button[data-testid^='btn-available-slot']")
-        n = btns.count()
-        if n == 0:
-            return []
-        self._log(f"按可用按钮回查日期,共 {n} 个 btn-available-slot")
-        for j in range(n):
-            btn = btns.nth(j)
-            row = btn.locator("xpath=./ancestor::div[.//p][1]")
-            p_ele = row.locator("p").first
-            if not p_ele.count():
-                continue
-            day_match = re.search(r"\d+", p_ele.inner_text())
-            if not day_match:
-                continue
-            day_str = day_match.group()
-            try:
-                full_date = f"{target_year}-{target_month_num:02d}-{int(day_str):02d}"
-            except ValueError:
-                continue
-            btn_html = btn.inner_html()
-            time_match = re.search(r"\d{2}:\d{2}", btn_html)
-            if not time_match:
-                continue
-            time_str = time_match.group()
-            test_id = btn.get_attribute("data-testid") or ""
-            if "prime" in test_id and "weekend" in test_id:
-                lbl = "ptaw"
-            elif "prime" in test_id:
-                lbl = "pta"
-            else:
-                lbl = ""
-            all_slots.append({"date": full_date, "time": time_str, "label": lbl})
-        return all_slots
-
-    def _query_impl(self, apt_type: AppointmentType) -> VSQueryResult:
-        res = VSQueryResult()
-        res.success = False
-        interest_month = self.free_config.get("interest_month", time.strftime("%m-%Y"))
-        
-        target_date_obj = datetime.strptime(interest_month, "%m-%Y")
-        target_month_text = target_date_obj.strftime("%B %Y")
-        target_year = target_date_obj.year
-        target_month_num = target_date_obj.month
-        
-        slots = []
-        current_selected_ele = self.page.locator('[data-testid="btn-current-month-available"]').first
-        current_month_text = current_selected_ele.inner_text().strip() if current_selected_ele.count() else ""
-
-        is_on_target_month = (current_month_text.lower() == target_month_text.lower())
-
-        if not is_on_target_month:
-            self._log(f"Current is '{current_month_text}', navigating to '{target_month_text}'...")
-            reached_target = False
-            for step in range(12):
-                current_ele = self.page.locator('[data-testid="btn-current-month-available"]').first
-                if current_ele.count() and current_ele.inner_text().strip().lower() == target_month_text.lower():
-                    self._log(f"✅ Successfully navigated to target month: '{target_month_text}'!")
-                    reached_target = True
-                    break
-                
-                next_btn = self.page.locator('[data-testid="btn-next-month-available"]').first
-                
-                if next_btn.count():
-                    next_btn.click(timeout=5000)
-                    time.sleep(2.5)
-                else:
-                    self._log("⚠️ Reached the end of the calendar or 'Next Month' is disabled.")
-                    break
-
-            if not reached_target:
-                self._log(f"❌ Could not navigate to target month: {target_month_text}. Stop parsing.")
-                res.success = False
-                res.availability_status = AvailabilityStatus.NoneAvailable
-                return res
-            self._log("Extracting slots from DOM using robust data-testid features...")
-            slots = self._extract_slots_from_calendar_dom(target_year, target_month_num)
-        else:
-            self._log(f"Already on '{target_month_text}'. Executing silent JS fetch...") 
-            resp = self._perform_request("GET", self.page.url, retry_count=1)
-            self._check_page_is_session_expired_or_invalid('Book your appointment', resp.text)
-            slots = self._parse_appointment_slots(resp.text)
-        
-        if slots:
-            res.success = True
-            earliest_date = slots[0]["date"]
-            earliest_dt = datetime.strptime(earliest_date, "%Y-%m-%d")
-            res.availability_status = AvailabilityStatus.Available
-            res.earliest_date = earliest_dt
-            date_map: dict[datetime, list[TimeSlot]] = {}
-            for s in slots:
-                date_str = s["date"]
-                dt = datetime.strptime(date_str, "%Y-%m-%d")
-                date_map.setdefault(dt, []).append(
-                    TimeSlot(time=s["time"], label=str(s.get("label", "")))
-                )
-            res.availability = [DateAvailability(date=d, times=slots) for d, slots in date_map.items()]
-            self._log(f"Slot Found! -> {slots}")
-        else:
-            self._log("No slots available.")
-            res.success = False
-            res.availability_status = AvailabilityStatus.NoneAvailable
-        return res
-
-    def book(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
-        return self._run_on_pw_thread(self._book_impl, slot_info, user_inputs)
-
-    def _book_impl(self, slot_info: VSQueryResult, user_inputs: Dict = None) -> VSBookResult:
-        if user_inputs is None:
-            user_inputs = {}
-        res = VSBookResult()
-        res.success = False
-        
-        exp_start = user_inputs.get('expected_start_date', '')
-        exp_end = user_inputs.get('expected_end_date', '')
-        support_pta = user_inputs.get('support_pta', True)
-
-        target_labels = ['']
-        if support_pta:
-            target_labels.append('pta')
-
-        available_dates_str =[
-            da.date.strftime("%Y-%m-%d")
-            for da in slot_info.availability if da.date
-        ]
-        
-        valid_dates_list = self._filter_dates(available_dates_str, exp_start, exp_end)
-        if not valid_dates_list:
-            raise NotFoundError(message="No dates match user constraints")
-        
-        all_possible_slots =[]
-        for da in slot_info.availability:
-            if not da.date:
-                continue
-                
-            date_str = da.date.strftime("%Y-%m-%d")
-            if date_str in valid_dates_list:
-                for t in da.times:
-                    if t.label in target_labels:
-                        all_possible_slots.append({
-                            "date": date_str,
-                            "time_obj": t,
-                            "label": t.label
-                        })
-        
-        if not all_possible_slots:
-            raise NotFoundError(message="No suitable slot found (after label filtering)")
-
-        selected_slot = random.choice(all_possible_slots)
-        selected_date = selected_slot["date"]
-        selected_time = selected_slot["time_obj"]
-        selected_label = selected_slot["label"]
-
-        self._log(f"Found {len(all_possible_slots)} valid slots. selected slot: {selected_date} {selected_time.time} {selected_label}")
-        
-        # 随机选择预订模式 - Mode 1 (鼠标移动 + JS更新 + 点击) 或 Mode 2 (直接 JS 更新 + 点击)
-        book_mode = random.choice([1, 2])
-        self._log(f"Using booking mode: {book_mode}")
-        
-        if book_mode == 1:
-            # Mode 1: 模拟真实用户行为 - 先移动鼠标到随机位置
-            rand_x = random.randint(300, 800)
-            rand_y = random.randint(400, 700)
-            self._log(f"Mode 1: Moving mouse to ({rand_x}, {rand_y}) and clicking")
-            # Playwright 中不直接支持 HumanMouse,但可以通过 hover 和 click 来实现
-            dummy_locator = self.page.locator(f"xpath=//*[@id='dummy_{random.randint(1000, 9999)}']")
-            try:
-                # 如果虚拟定位器存在就点击(通常不会存在),否则只是触发 mousemove 事件
-                dummy_locator.first.click(timeout=500)
-            except Exception:
-                pass
-            
-            js_update_form = f"""
-            try {{
-                const buttons = Array.from(document.querySelectorAll('button[type="submit"]'));
-                const submitBtn = buttons.find(btn => {{
-                    return btn.textContent.trim().toLowerCase().includes('book your appointment');
-                }});
-                if (!submitBtn) return 'Submit button not found';
-                const form = submitBtn.closest('form');
-                if (!form) return 'Correct form not found';
-                function setReactValue(input, value) {{
-                    if (!input) return;
-                    input.value = value;
-                    input.dispatchEvent(new Event('input', {{ bubbles: true }}));
-                    input.dispatchEvent(new Event('change', {{ bubbles: true }}));
-                }}
-                setReactValue(form.querySelector('input[name="date"]'), '{selected_date}');
-                setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}');
-                setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}');
-                submitBtn.removeAttribute('disabled');
-                submitBtn.classList.remove('opacity-50', 'cursor-not-allowed'); 
-                return 'form_updated';
-            }} catch (e) {{
-                return e.toString();
-            }}
-            """
-            update_res = self.page.evaluate(f"() => {{ {js_update_form} }}")
-            self._log(f"Mode 1: Form update triggered: {update_res}")
-            
-            if update_res != 'form_updated':
-                raise BizLogicError(message=f"Failed to update form in Mode 1: {update_res}")
-            
-            # 通过按钮定位器点击
-            submit_btn = self.page.locator("button:has-text('Book your appointment')").first
-            if not submit_btn.count():
-                raise BizLogicError(message="Submit button not found for Mode 1")
-            
-            self._log("Mode 1: Moving mouse to submit button and clicking")
-            time.sleep(random.uniform(0.2, 0.5))
-            submit_btn.click(timeout=10000)
-            inject_res = 'clicked'
-        
-        else:
-            # Mode 2: 直接 JS 注入和点击 (更快但可能被检测)
-            js_inject_and_click = f"""
-            try {{
-                const buttons = Array.from(document.querySelectorAll('button[type="submit"]'));
-                const submitBtn = buttons.find(btn => {{
-                    return btn.textContent.trim().toLowerCase().includes('book your appointment');
-                }});
-                if (!submitBtn) return 'Submit button not found';
-                const form = submitBtn.closest('form');
-                if (!form) return 'Correct form not found';
-                function setReactValue(input, value) {{
-                    if (!input) return;
-                    input.value = value;
-                    input.dispatchEvent(new Event('input', {{ bubbles: true }}));
-                    input.dispatchEvent(new Event('change', {{ bubbles: true }}));
-                }}
-                setReactValue(form.querySelector('input[name="date"]'), '{selected_date}');
-                setReactValue(form.querySelector('input[name="time"]'), '{selected_time.time}');
-                setReactValue(form.querySelector('input[name="appointmentLabel"]'), '{selected_label}');
-                submitBtn.removeAttribute('disabled');
-                submitBtn.click();
-                return 'clicked';
-            }} catch (e) {{
-                return e.toString();
-            }}
-            """
-            inject_res = self.page.evaluate(f"() => {{ {js_inject_and_click} }}")
-            self._log(f"Mode 2: Form submission triggered: {inject_res}")
-        
-        if inject_res != 'clicked':
-            raise BizLogicError(message="Failed to inject form or click the submit button")
-
-        self._log("Waiting for Next.js to process the form submission...")
-        for _ in range(10):
-            try:
-                current_page_url = self.page.url
-                current_page_html = self.page.content()
-                appointment_confirmation_indicators = [
-                    "order-summary" in current_page_url,
-                    "partner-services" in current_page_url,
-                    "appointment-confirmation" in current_page_url,
-                    "Change my appointment" in current_page_html,
-                    "Book a new appointment" in current_page_html,
-                ]
-                
-                if any(appointment_confirmation_indicators):
-                    self._log(f"✅ BOOKING SUCCESS! Redirected to: {current_page_url}")
-                    res.success = True
-                    res.label = selected_label
-                    res.book_date = selected_date
-                    res.book_time = selected_time.time
-                    self._save_screenshot("book_slot_success")
-                    break
-                
-                toast_selector = '[role=\"alert\"]'
-                toast_ele = self.page.locator(toast_selector).first
-                if toast_ele.count():
-                    error_msg = toast_ele.inner_text()
-                    self._log(f"❌ BOOKING FAILED! Detected popup: {error_msg}")
-                    break
-                time.sleep(0.5)
-            except Exception:
-                pass
-        return res
-    
-    def _get_proxy_url(self):
-        # 构造代理
-        proxy_url = ""
-        if self.config.proxy.ip:
-            s = self.config.proxy
-            if s.username:
-                proxy_url = f"{s.proto}://{s.username}:{s.password}@{s.ip}:{s.port}"
-            else:
-                proxy_url = f"{s.proto}://{s.ip}:{s.port}"
-        return proxy_url
-
-    def _perform_request(self, method, url, headers=None, data=None, json_data=None, params=None, retry_count=0):
-        """
-        在浏览器上下文中注入 JS 执行 Fetch
-        """
-        if not self.page:
-            raise BizLogicError("Browser not initialized")
-
-        if params:
-            from urllib.parse import urlencode
-            if '?' in url:
-                url += '&' + urlencode(params)
-            else:
-                url += '?' + urlencode(params)
-
-        fetch_options = {
-            "method": method.upper(),
-            "headers": headers or {},
-            "credentials": "include"
-        }
-
-        # Body 处理
-        if json_data:
-            fetch_options['body'] = json.dumps(json_data)
-            fetch_options['headers']['Content-Type'] = 'application/json'
-        elif data:
-             if isinstance(data, dict):
-                from urllib.parse import urlencode
-                fetch_options['body'] = urlencode(data)
-                fetch_options['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
-             else:
-                 fetch_options['body'] = data
-
-        js_script = f"""
-        const url = "{url}";
-        const options = {json.dumps(fetch_options)};
-        
-        return fetch(url, options)
-            .then(async response => {{
-                const text = await response.text();
-                const headers = {{}};
-                response.headers.forEach((value, key) => headers[key] = value);
-                
-                return {{
-                    status: response.status,
-                    body: text,
-                    headers: headers,
-                    url: response.url
-                }};
-            }})
-            .catch(error => {{
-                return {{
-                    status: 0,
-                    body: error.toString(),
-                    headers: {{}},
-                    url: url
-                }};
-            }});
-        """
-        
-        res_dict = self.page.evaluate(f"() => {{ {js_script} }}")
-        resp = BrowserResponse(res_dict)
-        
-        if resp.status_code == 200:
-            return resp
-        elif resp.status_code == 401:
-            self.is_healthy = False
-            raise SessionExpiredOrInvalidError()
-        elif resp.status_code == 403:
-            if retry_count < 2:
-                self._log(f"HTTP 403 Detected. Cloudflare session expired? Attempting refresh (Try {retry_count+1}/2)...")
-                if self._refresh_firewall_session():
-                    self._log("Firewall session refreshed. Retrying request...")
-                    return self._perform_request(method, url, headers, data, json_data, params, retry_count+1)
-                else:
-                    self._log("Failed to refresh firewall session.")    
-            raise PermissionDeniedError(f"HTTP 403: {resp.text[:100]}")
-        elif resp.status_code == 429:
-            self.is_healthy = False
-            raise RateLimiteddError()
-        else:
-            if resp.status_code == 0:
-                 raise BizLogicError(f"Network Error: {resp.text}")
-            raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
-    
-    def _refresh_firewall_session(self) -> bool:
-        """
-        主动刷新页面以触发 Cloudflare 挑战并尝试通过
-        """
-        try:
-            # 1. 刷新当前页面 (通常 Dashboard 页)
-            # 这会强制浏览器重新进行 HTTP 请求,从而触发 Cloudflare 拦截页
-            self._log("Refreshing page to trigger Cloudflare...")
-            self.page.reload(wait_until="domcontentloaded")
-            
-            # 2. 调用 CloudflareBypasser
-            cf = CloudflareBypasser(self.page, log=self.config.debug)
-            
-            # 3. 尝试过盾 (尝试次数稍多一点,因为此时可能网络不稳定)
-            success = cf.bypass(max_retry=10)
-            
-            if success:
-                # 再次确认页面是否正常加载 (非 403 页面)
-                title = self.page.title().lower()
-                if "access denied" in title:
-                    return False
-                
-                # 等待 DOM 稍微稳定
-                time.sleep(2)
-                return True
-            
-            return False
-        except Exception as e:
-            self._log(f"Error during firewall refresh: {e}")
-            return False
-
-    def _solve_recaptcha(self, params) -> str:
-        """调用 VSCloudApi 解决 ReCaptcha"""
-        key = params.get("apiToken")
-        if not key: raise NotFoundError("Api-token required")
-        
-        submit_url = "https://api.capsolver.com/createTask"
-        task = {
-            "type": params.get("type"),
-            "websiteURL": params.get("page"),
-            "websiteKey": params.get("siteKey"),
-        }
-        if params.get("action"):
-            task["pageAction"] = params.get("action")
-            
-        # if params.get("proxy"):
-        #     p = urlparse(params.get("proxy"))
-        #     task["proxyType"] = p.proto
-        #     task["proxyAddress"] = p.hostname
-        #     task["proxyPort"] = p.port
-        #     if p.username:
-        #         task["proxyLogin"] = p.username
-        #         task["proxyPassword"] = p.password
-            
-        # 注意:使用 Camoufox 后,通常是 ProxyLess 模式
-        # 除非你想让 Capsolver 也用同样的代理(通常不需要,除非风控极严)
-        
-        payload = {"clientKey": key, "task": task}
-        import requests as req # 局部引用,避免混淆
-        r = req.post(submit_url, json=payload, timeout=20)
-        if r.status_code != 200:
-            raise BizLogicError(message="Failed to submit capsolver task")
-        
-        task_id = r.json().get("taskId")
-        for _ in range(20):
-            r = req.post("https://api.capsolver.com/getTaskResult", json={"clientKey": key, "taskId": task_id}, timeout=20)
-            if r.status_code == 200:
-                d = r.json()
-                if d.get("status") == "ready":
-                    return d["solution"]["gRecaptchaResponse"]
-            time.sleep(3)
-        raise BizLogicError(message="Capsolver task timeout")
-
-    def _parse_travel_groups(self, html_content) -> List[Dict]:
-        groups = []
-        js_pattern = r'\\"travelGroups\\":\s*(\[.*?\]),\\"availableCountriesToCreateGroups'
-        js_match = re.search(js_pattern, html_content, re.DOTALL)
-        if js_match:
-            json_str = js_match.group(1).replace(r'\"', '"')
-            data = json.loads(json_str)
-            for g in data:
-                groups.append({
-                    'group_name': g.get('groupName'),
-                    'group_number': g.get('formGroupId'),
-                    'location': g.get('vacName')
-                })
-        else:
-            self._log('Parsed travel group page, but not found travelGroups')
-        return groups
-
-    def _parse_appointment_slots(self, html_content) -> List[Dict]:
-        slots = []
-        pattern = r'"availableAppointments\\":\s*(\[.*\]),\\"showFlexiAppointment'
-        match = re.search(pattern, html_content, re.DOTALL)
-        
-        if match:
-            json_str = match.group(1).replace(r'\"', '"')
-            data = json.loads(json_str)
-            for day in data:
-                d_str = day.get('day')
-                for s in day.get('slots', []):
-                    labels = s.get('labels', [])
-                    lbl = ""
-                    # 简化逻辑:TLS label 列表
-                    if 'pta' in labels: lbl = 'pta'
-                    elif 'ptaw' in labels: lbl = 'ptaw'
-                    elif '' in labels or not labels: lbl = ''
-                    
-                    slots.append({
-                        'date': d_str,
-                        'time': s.get('time'),
-                        'label': lbl
-                    })
-        return slots
-  
-    def _check_page_is_session_expired_or_invalid(self, keyword, html: str) -> bool:
-        if not html:
-            self.is_healthy = False
-            raise SessionExpiredOrInvalidError()
-        
-        html_lower = html.lower()
-        if keyword.lower() not in html_lower: 
-            session_expire_or_invalid_indicators = [
-                'redirected automatically' in html_lower,
-                'login' in html_lower and 'password' in html_lower,
-                'session expired' in html_lower
-            ]
-            if any(session_expire_or_invalid_indicators):
-                self.is_healthy = False
-                raise SessionExpiredOrInvalidError()
-            
-    def _filter_dates(self, dates: List[str], start_str: str, end_str: str) -> List[str]:
-        if not start_str or not end_str:
-            return dates
-        valid_dates = []
-        s_date = datetime.strptime(start_str[:10], "%Y-%m-%d")
-        e_date = datetime.strptime(end_str[:10], "%Y-%m-%d")
-        for date_str in dates:
-            curr_date = datetime.strptime(date_str, "%Y-%m-%d")
-            if s_date <= curr_date <= e_date:
-                valid_dates.append(date_str)
-        random.shuffle(valid_dates)
-        return valid_dates
-
-    def _is_selector_visible(self, selector: str, timeout: int = 10000) -> bool:
-        try:
-            self.page.wait_for_selector(selector, state="visible", timeout=timeout)
-            return True
-        except PlaywrightTimeoutError:
-            return False
-
-    def _human_type(self, text: str):
-        for ch in text:
-            self.page.keyboard.type(ch)
-            time.sleep(random.uniform(0.03, 0.12))
-
-    def _type_into_first_visible(self, selectors: List[str], text: str, field_name: str):
-        last_err = None
-        for selector in selectors:
-            try:
-                locator = self.page.locator(selector).first
-                locator.wait_for(state="visible", timeout=3000)
-                locator.click(timeout=3000)
-                time.sleep(random.uniform(0.2, 0.6))
-                locator.fill("")
-                self._human_type(text)
-                return
-            except Exception as e:
-                last_err = e
-                continue
-        raise BizLogicError(message=f"Can't find visible {field_name} input. Last error: {last_err}")
-    
-    def _close_playwright(self):
-        if self.page:
-            try:
-                self.page.close()
-            except Exception:
-                pass
-            self.page = None
-        if self.browser_ctx:
-            try:
-                self.browser_ctx.close()
-            except Exception:
-                pass
-            self.browser_ctx = None
-        if self.playwright:
-            try:
-                self.playwright.stop()
-            except Exception:
-                pass
-            self.playwright = None
-
-    def _rmtree_workspace(self):
-        if os.path.exists(self.root_workspace):
-            for _ in range(3):
-                try:
-                    time.sleep(0.2)
-                    shutil.rmtree(self.root_workspace, ignore_errors=True)
-                    break
-                except Exception as e:
-                    self._log(f"Cleanup retry: {e}")
-                    time.sleep(0.5)
-            if os.path.exists(self.root_workspace):
-                self._log(f"[WARN] Failed to fully remove workspace: {self.root_workspace}")
-
-    def _cleanup_failed_session(self):
-        """create_session 在工作线程内失败时调用;外层会 _stop_pw_thread。"""
-        self._close_playwright()
-        self._rmtree_workspace()
-
-    # --- 资源清理核心方法 ---
-    def cleanup(self):
-        """
-        销毁浏览器并彻底删除临时文件
-        """
-        w = getattr(self, "_pw_worker", None)
-        on_worker = w is not None and threading.current_thread() is w
-
-        if on_worker:
-            self._close_playwright()
-            self._rmtree_workspace()
-            return
-
-        if w is not None and self._pw_thread and self._pw_thread.is_alive():
-            try:
-                self._run_on_pw_thread(self._close_playwright)
-            except Exception:
-                self._close_playwright()
-            self._rmtree_workspace()
-            self._stop_pw_thread()
-        else:
-            self._close_playwright()
-            self._rmtree_workspace()
-    def __del__(self):
-        """
-        析构函数:当对象被垃圾回收时自动调用
-        """
-        self.cleanup()
-
-
-class TlsPlugin2(TlsPlugin):
-    """兼容工厂按模块名加载 `TlsPlugin2` 的场景。"""
-    pass

+ 4 - 8
sentinel.py

@@ -143,8 +143,6 @@ class SentinelGCO:
                         VSCloudApi.Instance().slot_refresh_start(apt_type.routing_key, country=apt_type.country, city=apt_type.city, visa_type=apt_type.visa_type)
                         result = task.instance.query(apt_type)
                         result.apt_type = apt_type
-                        VSCloudApi.Instance().slot_refresh_success(apt_type.routing_key)
-
                         if result.success:
                             ttl = self.m_cfg.sentinel.signal_ttl
                             self._log(f"🔥 SLOT FOUND! Writing signal to Redis (TTL: {ttl}s)")
@@ -155,15 +153,13 @@ class SentinelGCO:
                                 "timestamp": now
                             }
                             redis_key = self._get_redis_key(apt_type.routing_key)
-                            self.redis_client.setex(redis_key, ttl, json.dumps(payload))
-                            
+                            self.redis_client.publish(redis_key, json.dumps(payload))
                             payload["query_result"]["website"] = self.m_cfg.website
                             VSCloudApi.Instance().slot_snapshot_report(payload["query_result"])
-
+                        VSCloudApi.Instance().slot_refresh_success(apt_type.routing_key)
                     except Exception as e:
                         self._log(f"Query exception: {e}")
                         VSCloudApi.Instance().slot_refresh_fail(apt_type.routing_key, error=str(e))
-                        
             except Exception as e:
                 self._log(f"Monitor loop error: {e}")
                 time.sleep(2)
@@ -211,13 +207,13 @@ class SentinelGCO:
                     plg_cfg.account.id = 0
                     plg_cfg.account.username = "Guest"
                 else:
-                    acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd)
+                    acc = VSCloudApi.Instance().get_next_account(self.m_cfg.sentinel.account_pool_id, self.m_cfg.sentinel.account_cd, test=False)
                     plg_cfg.account.id = acc['id']
                     plg_cfg.account.username = acc['username']
                     plg_cfg.account.password = acc['password']
                 
                 if self.m_cfg.need_proxy:
-                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd)
+                    proxy = VSCloudApi.Instance().get_next_proxy(self.m_cfg.proxy_pool, self.m_cfg.proxy_cd, test=False)
                     plg_cfg.proxy.id = proxy['id']
                     plg_cfg.proxy.ip = proxy['ip']
                     plg_cfg.proxy.port = proxy['port']

+ 57 - 0
test/test_publish_slot.py

@@ -0,0 +1,57 @@
+import json
+import redis
+
+# Redis 连接
+r = redis.Redis(
+    host="45.137.220.138",
+    port=6379,
+    db=0,
+    password="STEs2x6ML0U1HlpE9SojM6YU7QPhqzY8",
+    decode_responses=True
+)
+
+# topic/channel
+channel = "vs:signal:slot.lon.fr.tourist"
+
+# 消息体
+message = {
+    "group_id": "tls.gb.fr",
+    "apt_type": {
+        "weight": 10,
+        "routing_key": "slot.lon.fr.tourist",
+        "city": "London",
+        "visa_type": "Tourist",
+        "country": "France"
+    },
+    "query_result": {
+        "routing_key": "slot.lon.fr.tourist",
+        "country": "France",
+        "city": "London",
+        "visa_type": "Tourist",
+        "availability_status": "Available",
+        "earliest_date": "2026-06-05",
+        "availability": [
+            {
+                "date": "2026-06-05",
+                "times": [
+                    {
+                        "time": "14:00",
+                        "label": ""
+                    }
+                ]
+            }
+        ],
+        "snapshot_source": "worker",
+        "snapshot_at": "2026-05-02T14:23:45.123456+00:00"
+    },
+    "timestamp": 1777745853
+}
+
+# 发布
+receiver_count = r.publish(
+    channel,
+    json.dumps(message, ensure_ascii=False)
+)
+
+print(f"发布成功 -> {channel}")
+print(f"收到订阅者数量: {receiver_count}")

+ 65 - 36
toolkit/vs_cloud_api.py

@@ -54,15 +54,40 @@ class VSCloudApi:
         else:
             raise BizLogicError(message=f"HTTP Error {resp.status_code}: {resp.text[:100]}")
 
-    # =========================================================================
-    #  VAS Task Management (新增 API)
-    # =========================================================================
-
-    def get_vas_task_pop(self, routing_key: str) -> Optional[Dict]:
-        """
-        获取任务信息
-        API: GET /api/vas/task/pop
-        """
+    def get_vas_task_pop(self, routing_key: str, test=False) -> Optional[Dict]:
+        if test:
+            return  {
+                "status": "running",
+                "priority": 10,
+                "config": {
+                    "select": "random",
+                    "alias_email": "mariaandrews2240@gmail-app.com",
+                    "time_filter": ["AM","PM"],
+                    "exclude_dates": [],
+                    "include_today": True,
+                    "allowed_weekdays": [1,2,3,4,5],
+                    "include_tomorrow": True
+                },
+                "user_inputs": {
+                    "email": "mariaandrews2240@gmail-app.com",
+                    "password": "Visafly@111",
+                    "username": "mariaandrews2240@gmail-app.com",
+                    "support_pta": False,
+                    "expected_end_date": "2026-06-30",
+                    "expected_start_date": "2026-05-01"
+                },
+                "grabbed_history": {},
+                "meta": None,
+                "id": 0,
+                "order_id": "ORD-20260415100806-0fdee58d",
+                "routing_key": "auto.slot.lon.fr.tourist",
+                "script_version": "latest",
+                "attempt_count": 0,
+                "notify_count": 0,
+                "created_at": "2000-01-01T01:00:00",
+                "updated_at": "2000-01-01T01:00:00",
+                "expire_at":  "2000-01-01T01:00:00"
+            }
         url = f"{self.base_url}/api/vas/task/pop"
         params = {
             "queue_name": routing_key,
@@ -78,11 +103,6 @@ class VSCloudApi:
             raise BizLogicError(message=f"Get vas task pop biz error: {result.get('message')}")
     
     def get_vas_task(self, task_id: str) -> dict:
-        # 例如:请求 GET /api/v1/tasks/{task_id}
-        # curl -X 'GET' \
-        # 'http://45.137.220.138:8888/api/vas/task/get_by_order?order_id=ORD-20260306212306-7e604df8' \
-        # -H 'accept: application/json' \
-        # -H 'Authorization: Bearer tok_c9be86aa78274939a3c008db31ce9d22'
         url = f"{self.base_url}/api/vas/task/detail"
         params = {"task_id": task_id}
         headers = self._get_headers()
@@ -94,7 +114,7 @@ class VSCloudApi:
             raise BizLogicError(message=f"Get Task={task_id} error: {result.get('message')}")
 
     def update_vas_task(self, 
-                        task_id: int, 
+                        task_id: str, 
                         update_data: Dict[str, Any]) -> Optional[Dict]:
         """
         更新任务
@@ -316,8 +336,21 @@ class VSCloudApi:
     def get_next_account(
         self,
         pool_name: str,
-        account_cd: int = 60
+        account_cd: int = 60,
+        test: bool = False
     ):
+        if test:
+            return {
+                "pool_name": "tls.gb.fr.sentinel",
+                "username": "robertolord2257@gmail-app.com",
+                "password": "Visafly@111",
+                "extra_data": {},
+                "status": "active",
+                "next_use_time": "2000-01-01T01:00:00",
+                "id": 0,
+                "created_at": "2000-01-01T01:00:00",
+                "updated_at": "2000-01-01T01:00:00"
+            }
         url = f'{self.base_url}/api/account/next'
         payload = {
             "pool_name": pool_name,
@@ -331,30 +364,26 @@ class VSCloudApi:
         else:
             raise BizLogicError(message=f"Get next account biz error: {result.get('message')}")
         
-    # def get_next_proxy(
-    #     self,
-    #     pools: List[str],
-    #     proxy_cd: int = 60
-    # ):
-    #     local = {
-    #         "pool_name": "local",
-    #         "proto": "http",
-    #         "ip": "127.0.0.1",
-    #         "port": 7890,
-    #         "username": "",
-    #         "password": "",
-    #         "time_zone": "Europe/Dublin",
-    #         "id": 213
-    #     }
-    #     node = switch_next_node()
-    #     VSC_INFO('-', f'proxy node={node}')
-    #     return local
-        
     def get_next_proxy(
         self,
         pools: List[str],
-        proxy_cd: int = 60
+        proxy_cd: int = 60,
+        test: bool = False
     ):
+        if test:
+            local = {
+                "pool_name": "local",
+                "proto": "http",
+                "ip": "127.0.0.1",
+                "port": 7890,
+                "username": "",
+                "password": "",
+                "time_zone": "Europe/Dublin",
+                "id": 0
+            }
+            node = switch_next_node()
+            VSC_INFO('-', f'proxy node={node}')
+            return local 
         url = f'{self.base_url}/api/proxy/next-ip'
         payload = {
             "pools": pools,

+ 0 - 263
utils/cloudflare_bypass_for_scraping2.py

@@ -1,263 +0,0 @@
-import random
-import time
-from typing import Any
-
-
-class CloudflareBypasser:
-    def __init__(self, driver: Any, log=True):
-        self.driver = driver
-        self.log = log
-        
-    def log_message(self, message):
-        if self.log:
-            print(message)
-    
-    def _normalize_page(self):
-        # 兼容 TlsPlugin 中的 CamoufoxPageAdapter
-        return getattr(self.driver, "_page", self.driver)
-
-    def _is_challenge_frame(self, frame) -> bool:
-        frame_name = (frame.name or "").lower()
-        frame_url = (frame.url or "").lower()
-        markers = (
-            "turnstile",
-            "challenges.cloudflare.com",
-            "challenge",
-            "cf-chl",
-        )
-        return any(m in frame_name or m in frame_url for m in markers)
-
-    def _determine_challenge_type(self) -> str:
-        try:
-            page = self._normalize_page()
-            title = (page.title() or "").lower()
-            html = (page.content() or "").lower()
-            if "please complete the captcha" in html or "turnstile" in html:
-                return "turnstile"
-            if "just a moment" in title or "checking your browser" in html:
-                return "interstitial"
-            return "none"
-        except Exception as e:
-            self.log_message(f"Error determining challenge type: {e}")
-            return "unknown"
-
-    def _click_checkbox_in_frame(self, frame) -> bool:
-        selectors = [
-            "input[type='checkbox']",
-            "[role='checkbox']",
-            "label.ctp-checkbox-label",
-            "div.ctp-checkbox-label",
-            "label[for*='cf']",
-        ]
-        for selector in selectors:
-            try:
-                loc = frame.locator(selector)
-                if loc.count() <= 0:
-                    continue
-                target = loc.first
-                target.click(timeout=2000)
-                return True
-            except Exception:
-                continue
-
-        # 在 frame 内做 open-shadow 递归查找(closed shadow 无法直接访问)
-        try:
-            clicked = frame.evaluate(
-                """
-                () => {
-                    const selectors = [
-                        "input[type='checkbox']",
-                        "[role='checkbox']",
-                        "label.ctp-checkbox-label",
-                        "div.ctp-checkbox-label",
-                        "label[for*='cf']"
-                    ];
-                    const seen = new WeakSet();
-                    const stack = [document];
-
-                    while (stack.length) {
-                        const root = stack.pop();
-                        if (!root || seen.has(root)) continue;
-                        seen.add(root);
-
-                        for (const sel of selectors) {
-                            const hit = root.querySelector(sel);
-                            if (hit) {
-                                hit.click();
-                                return true;
-                            }
-                        }
-
-                        const nodes = root.querySelectorAll ? root.querySelectorAll("*") : [];
-                        for (const node of nodes) {
-                            if (node.shadowRoot) stack.push(node.shadowRoot);
-                        }
-                    }
-                    return false;
-                }
-                """
-            )
-            return bool(clicked)
-        except Exception:
-            return False
-
-    def _click_challenge_iframe_center(self) -> bool:
-        page = self._normalize_page()
-        for frame in page.frames:
-            if not self._is_challenge_frame(frame):
-                continue
-
-            try:
-                frame_el = frame.frame_element()
-                box = frame_el.bounding_box()
-                if not box:
-                    continue
-
-                # 人类化一点:点击中心附近随机偏移,避免固定坐标
-                cx = box["x"] + box["width"] * (0.5 + random.uniform(-0.08, 0.08))
-                cy = box["y"] + box["height"] * (0.5 + random.uniform(-0.08, 0.08))
-                page.mouse.move(cx, cy, steps=10)
-                time.sleep(random.uniform(0.15, 0.45))
-                page.mouse.click(cx, cy, delay=random.randint(50, 180))
-                return True
-            except Exception:
-                continue
-        return False
-    
-    def click_verification_button(self, _is_dfs=False):
-        try:
-            page = self._normalize_page()
-            for frame in page.frames:
-                if not self._is_challenge_frame(frame):
-                    continue
-                if self._click_checkbox_in_frame(frame):
-                    self.log_message("Challenge interaction succeeded by frame selector/evaluate.")
-                    time.sleep(1)
-                    return
-
-            if self._click_challenge_iframe_center():
-                self.log_message("Challenge interaction succeeded by iframe center click.")
-                time.sleep(1)
-                return
-
-            self.log_message("Challenge click strategies exhausted.")
-
-        except Exception as e:
-            self.log_message(f"Error clicking verification button: {e}")
-
-    def is_bypassed(self):
-        try:
-            page = self._normalize_page()
-            title = (page.title() or "").lower()
-            html = (page.content() or "").lower()
-            blocked_markers = (
-                "just a moment",
-                "请稍候",
-                "checking your browser",
-                "cf-challenge",
-                "please complete the captcha",
-            )
-            return not any(m in title or m in html for m in blocked_markers)
-        except Exception as e:
-            self.log_message(f"Error checking page title: {e}")
-        return False
-
-    def _collect_page_state(self) -> str:
-        """
-        采样当前页面状态,帮助定位卡在哪一轮挑战。
-        """
-        try:
-            page = self._normalize_page()
-            title = page.title()
-            url = page.url
-            challenge_type = self._determine_challenge_type()
-            challenge_frames = 0
-            for frame in page.frames:
-                if self._is_challenge_frame(frame):
-                    challenge_frames += 1
-            return (
-                f"title={title!r}, url={url!r}, challenge_type={challenge_type}, "
-                f"challenge_frames={challenge_frames}"
-            )
-        except Exception as e:
-            return f"state_collect_error={e}"
-
-    def _collect_state_signature(self):
-        try:
-            page = self._normalize_page()
-            title = (page.title() or "").lower()
-            url = (page.url or "").lower()
-            challenge_type = self._determine_challenge_type()
-            challenge_frames = 0
-            for frame in page.frames:
-                if self._is_challenge_frame(frame):
-                    challenge_frames += 1
-            return (challenge_type, challenge_frames, title[:80], url[:120])
-        except Exception:
-            return ("unknown", -1, "", "")
-
-    def bypass(self, max_retry=5):
-        for i in range(max_retry):
-            if self.is_bypassed():
-                return True
-
-            sig_before = self._collect_state_signature()
-            state_before = self._collect_page_state()
-            self.log_message(
-                f"Verification page detected. try={i + 1}/{max_retry}, before_click: {state_before}"
-            )
-            self.click_verification_button(False)
-
-            # 点击后短暂等待,再次检查是否通过
-            time.sleep(1.2)
-            if self.is_bypassed():
-                self.log_message("Bypass success after click.")
-                return True
-
-            sig_after = self._collect_state_signature()
-            no_transition = sig_before == sig_after
-            if no_transition:
-                self.log_message("No challenge state transition detected after click.")
-                # 当页面状态完全不变时,做一次轻量刷新,触发 challenge 重新渲染
-                try:
-                    page = self._normalize_page()
-                    page.reload(wait_until="domcontentloaded")
-                    self.log_message("Page reloaded to retrigger challenge rendering.")
-                    time.sleep(1.5)
-                    if self.is_bypassed():
-                        self.log_message("Bypass success after reload.")
-                        return True
-                except Exception as reload_err:
-                    self.log_message(f"Reload failed: {reload_err}")
-
-            # 递增退避,降低持续高频点击导致的风控风险
-            wait_seconds = min(2 + i, 6)
-            state_after = self._collect_page_state()
-            self.log_message(
-                f"Bypass not yet complete, sleeping {wait_seconds}s, after_click: {state_after}"
-            )
-            time.sleep(wait_seconds)
-
-        final_ok = self.is_bypassed()
-        if not final_ok:
-            self.log_message(f"Bypass failed after retries. final_state: {self._collect_page_state()}")
-        return final_ok
-
-    def handle_waiting_room(self, timeout_seconds=6 * 60, poll_seconds=10):
-        wait_start = time.time()
-        while True:
-            try:
-                page = self._normalize_page()
-                html = (page.content() or "").lower()
-            except Exception as e:
-                self.log_message(f"Waiting room check failed: {e}")
-                break
-
-            if "file d'attente" in html or "waiting room" in html:
-                if time.time() - wait_start > timeout_seconds:
-                    self.log_message("Waiting room timeout reached.")
-                    break
-                self.log_message("In Waiting Room... waiting for auto-refresh.")
-                time.sleep(poll_seconds)
-                continue
-            break