소스 검색

feat: update

root 2 주 전
부모
커밋
01c77530b6
4개의 변경된 파일110개의 추가작업 그리고 148개의 파일을 삭제
  1. 32 45
      booker_builtin.py
  2. 57 65
      booker_order.py
  3. 19 38
      main_sweeper.py
  4. 2 0
      vs_types.py

+ 32 - 45
booker_builtin.py

@@ -33,6 +33,7 @@ class BuiltinBookerGCO:
         self.group_backoff = ExponentialBackoff(base_delay=60.0, max_delay=10*60.0, factor=2.0)
         self.task_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
         self.m_last_spawn_time = 0.0
+        self.heartbeat_ttl = 300
 
     def _log(self, message):
         if self.m_logger:
@@ -57,51 +58,34 @@ class BuiltinBookerGCO:
 
     def _get_redis_key(self, routing_key: str) -> str:
         return f"vs:signal:{routing_key}"
-    
-    def _safe_return_task(self, task_id: int, reason: str = ""):
-        if not task_id:
-            return
-        try:
-            task_data = VSCloudApi.Instance().get_vas_task(task_id)
-            if not task_data:
-                self.redis_client.zrem(self.m_tracker_key, task_id)
-                return
-            
-            current_status = task_data.get('status', '')
-            if current_status in['pending', 'grabbed', 'cancelled', 'success']:
-                self.redis_client.zrem(self.m_tracker_key, task_id)
-                return
-                
-            self._log(f"Returning task={task_id} to queue. Reason: {reason}")
-            VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-            
-            # 归还成功,核销防丢记录
-            self.redis_client.zrem(self.m_tracker_key, task_id)
-            
-        except Exception as ex:
-            self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
 
     def _maintain_loop(self):
         self._log("Maintain loop started.")
-        rng = random.Random()
         while not self.m_stop_event.is_set():
-            wait_seconds = rng.randint(180, 300)
-            for _ in range(wait_seconds):
-                if self.m_stop_event.is_set():
-                    return
-                time.sleep(1.0)
+            time.sleep(1.0)
+            now = time.time()
             
             with self.m_lock:
                 tasks_to_check = list(self.m_tasks)
             
+            if not tasks_to_check:
+                continue
+            
             healthy_tasks = []
             for t in tasks_to_check:
-                try:
-                    t.instance.keep_alive()
-                    if t.instance.health_check():
-                        healthy_tasks.append(t)
-                except Exception as e:
-                    self._log(f"Instance keep-alive failed: {e}")
+                if now >= t.next_remote_ping:
+                    try:
+                        t.instance.keep_alive()
+                        if t.instance.health_check():
+                            healthy_tasks.append(t)
+                            next_delay = random.randint(180, 300) 
+                            t.next_remote_ping = now + next_delay
+                        else:
+                            self._log(f"♻️ Instance unhealthy. Will be removed.")
+                    except Exception as e:
+                        self._log(f"Instance keep-alive failed: {e}")
+                else:
+                    healthy_tasks.append(t)
             
             with self.m_lock:
                 self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
@@ -159,16 +143,19 @@ class BuiltinBookerGCO:
         task_id = None
         task_data = None
         booking_success = False
+        is_rate_limited = False
+        
         try:
             task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
             if not task_data:
                 return 
             task_id = task_data['id']
             order_id = task_data.get('order_id')
-            self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
-            
+            self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
+                        
             user_input = task_data.get('user_inputs', {})
             book_res = task.instance.book(query_result, user_input)
+            
             if book_res.success:
                 booking_success = True
                 self._log(f"✅ BOOK SUCCESS! Order: {order_id}")
@@ -216,6 +203,7 @@ class BuiltinBookerGCO:
                 "Rate limited" in err_str
             ]
             if any(rate_limited_indicators):
+                is_rate_limited = True
                 with self.m_lock:
                     if task in self.m_tasks:
                         self.m_tasks.remove(task)
@@ -236,15 +224,13 @@ class BuiltinBookerGCO:
                         self.m_stop_event.wait(wait_sec)
                         self._safe_return_task(tid, reason=reason)
                         
-                    t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
-                    t.start()
-                    
-                    task_id = None 
+                    self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
             
         finally:
-           if not booking_success and task_id is not None:
-                self._safe_return_task(task_id, reason="Booking failed or error occurred")
-
+           if not booking_success and task_id is not None and not is_rate_limited:
+                self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
+                self._log(f"♻️ Task={task_id} normal failure. Instantly handed over to Sweeper.")
+                
     def _creator_loop(self):
         self._log("Creator loop started.")
         spawn_interval = 10.0
@@ -304,7 +290,8 @@ class BuiltinBookerGCO:
                             task_ref=None,
                             acceptable_routing_keys=all_keys,
                             source_queue="built-in",
-                            book_allowed=True
+                            book_allowed=True,
+                            next_remote_ping = time.time() + random.randint(180, 300) 
                         )
                     )
                     

+ 57 - 65
booker_order.py

@@ -34,6 +34,7 @@ class OrderBookerGCO:
         self.queue_backoff = ExponentialBackoff(base_delay=1*60.0, max_delay=10*60.0, factor=2.0)
         self.account_backoff = ExponentialBackoff(base_delay=5*60.0, max_delay=2*60*60.0, factor=2.0)
         self.m_last_spawn_time = 0.0
+        self.heartbeat_ttl = 300
 
     def _log(self, message):
         if self.m_logger:
@@ -58,66 +59,70 @@ class OrderBookerGCO:
 
     def _get_redis_key(self, routing_key: str) -> str:
         return f"vs:signal:{routing_key}"
-
-    def _safe_return_task(self, task_id: int, reason: str = ""):
-        if not task_id:
-            return
-        try:
-            task_data = VSCloudApi.Instance().get_vas_task(task_id)
-            if not task_data:
-                self.redis_client.zrem(self.m_tracker_key, task_id)
-                return
-            
-            current_status = task_data.get('status', '')
-            if current_status in['pending', 'grabbed', 'cancelled', 'success']:
-                self.redis_client.zrem(self.m_tracker_key, task_id)
-                return
-                
-            self._log(f"Returning task={task_id} to queue. Reason: {reason}")
-            VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-            
-            # 归还成功,核销防丢记录
-            self.redis_client.zrem(self.m_tracker_key, task_id)
-            
-        except Exception as ex:
-            self._log(f"Failed to safely return task for task_id {task_id}: {ex}")
             
     def _maintain_loop(self):
         self._log("Maintain loop started.")
-        rng = random.Random()
+        heartbeat_interval = 60
         while not self.m_stop_event.is_set():
-            wait_seconds = rng.randint(180, 300)
-            for _ in range(wait_seconds):
+            for _ in range(heartbeat_interval):
                 if self.m_stop_event.is_set():
                     return
                 time.sleep(1.0)
             
             with self.m_lock:
                 tasks_to_check = list(self.m_tasks)
+                
+            if not tasks_to_check:
+                continue
             
             healthy_tasks = []
             dead_tasks = []
+            now = time.time()
             
             for t in tasks_to_check:
-                try:
-                    t.instance.keep_alive()
-                    if t.instance.health_check(): 
-                        healthy_tasks.append(t)
-                    else:
+                if now >= t.next_remote_ping:
+                    try:
+                        t.instance.keep_alive()
+                        if t.instance.health_check(): 
+                            healthy_tasks.append(t)
+                            next_delay = random.randint(180, 300) 
+                            t.next_remote_ping = now + next_delay
+                            self._log(f"🛡️ Task={t.task_ref} keep-alive success. Next ping in {next_delay}s.")
+                        else:
+                            dead_tasks.append(t)
+                            self._log(f"♻️ Instance for task={t.task_ref} unhealthy.")
+                    except Exception as e:
                         dead_tasks.append(t)
-                        self._log(f"♻️ Instance for task={t.task_ref} unhealthy, marking for removal.")
+                        self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}.")
+                else:
+                    healthy_tasks.append(t)
+            
+            if healthy_tasks:
+                try:
+                    pipeline = self.redis_client.pipeline()
+                    new_deadline = time.time() + self.heartbeat_ttl
+                    for t in healthy_tasks:
+                        if t.task_ref is not None:
+                            pipeline.zadd(self.m_tracker_key, {str(t.task_ref): new_deadline})
+                    pipeline.execute()
+                    self._log(f"💓 Heartbeat sent. Renewed {len(healthy_tasks)} tasks.")
+                except Exception as e:
+                    self._log(f"Redis Heartbeat update failed: {e}")
+
+            if dead_tasks:
+                try:
+                    pipeline = self.redis_client.pipeline()
+                    for t in dead_tasks:
+                        if t.task_ref is not None:
+                            pipeline.zadd(self.m_tracker_key, {str(t.task_ref): 0})
+                    pipeline.execute()
+                    self._log(f"🗑️ Handed over {len(dead_tasks)} dead tasks to Sweeper.")
                 except Exception as e:
-                    dead_tasks.append(t)
-                    self._log(f"♻️ Instance for task={t.task_ref} keep-alive failed: {e}, marking for removal.")
+                    pass
             
             with self.m_lock:
                 self.m_tasks = [t for t in self.m_tasks if t in healthy_tasks]
 
-            # 实例死亡,调用安全归还函数
-            for t in dead_tasks:
-                if t.task_ref is not None:
-                    self._safe_return_task(t.task_ref, reason="Instance died during maintain_loop")
-
     def _booking_trigger_loop(self):
         self._log("Trigger loop started.")
         while not self.m_stop_event.is_set():
@@ -170,17 +175,16 @@ class OrderBookerGCO:
 
         try:
             task_data = VSCloudApi.Instance().get_vas_task(task_id)
-            if not task_data or task_data.get('status') in ['grabbed', 'cancelled']:
+            if not task_data or task_data.get('status') in ['grabbed', 'pause', 'completed', 'cancelled']:
                 self._log(f"Bound Task={task_id} is no longer valid or already processed. Removing instance.")
                 with self.m_lock:
                     if task in self.m_tasks:
                         self.m_tasks.remove(task)
+                self.redis_client.zrem(self.m_tracker_key, task_id)
                 return
             
             order_id = task_data.get('order_id')
             user_input = task_data.get('user_inputs', {})
-            self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
-
             book_res = task.instance.book(query_result, user_input)
 
             if book_res.success:
@@ -238,13 +242,7 @@ class OrderBookerGCO:
                         
                     t_cd = self.task_backoff.calculate(t_fails)
                     self._log(f"⏳ Task={task_id} (Booking Attempt {t_fails}) suspended for {t_cd:.1f}s.")
-                    
-                    def delayed_return(tid, wait_sec, reason):
-                        self.m_stop_event.wait(wait_sec)
-                        self._safe_return_task(tid, reason=reason)
-                        
-                    t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Booking failed: rate limited (fails={t_fails})"), daemon=True)
-                    t.start()
+                    self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})
 
     def _creator_loop(self):
         self._log("Creator loop started.")
@@ -267,7 +265,6 @@ class OrderBookerGCO:
                     now = time.time()
                     if now - self.m_last_spawn_time >= spawn_interval:
                         self.m_last_spawn_time = now 
-                        self._log(f"Staggered: Spawning booker for [{r_key}]. Next in {spawn_interval}s.")
                         self._spawn_worker(r_key)
                         break
 
@@ -278,6 +275,8 @@ class OrderBookerGCO:
         def _job():
             success = False
             task_id = None
+            is_rate_limited = False
+            
             try:
                 queue_name = f"auto.{target_routing_key}"
                 task_data = VSCloudApi.Instance().get_vas_task_pop(queue_name)
@@ -286,7 +285,7 @@ class OrderBookerGCO:
                 
                 task_id = task_data['id']
                 
-                self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time()})
+                self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + self.heartbeat_ttl})
                 user_inputs = task_data.get('user_inputs', {})
                 
                 plg_cfg = VSPlgConfig()
@@ -322,7 +321,8 @@ class OrderBookerGCO:
                             task_ref=task_id,
                             acceptable_routing_keys=acceptable_keys, 
                             source_queue=target_routing_key,
-                            book_allowed=True
+                            book_allowed=True,
+                            next_remote_ping=time.time() + random.randint(180, 300) 
                         )
                     )
                     queue_fail_key = f"vs:queue:failures:{target_routing_key}"
@@ -346,6 +346,7 @@ class OrderBookerGCO:
                     "Rate limited" in err_str
                 ]
                 if any(rate_limited_indicators):
+                    is_rate_limited = True
                     queue_fail_key = f"vs:queue:failures:{target_routing_key}"
                     queue_cd_key = f"vs:queue:cooldown:{target_routing_key}"
                     q_fails = self.redis_client.incr(queue_fail_key)
@@ -364,22 +365,13 @@ class OrderBookerGCO:
                         
                         t_cd = self.account_backoff.calculate(t_fails)
                         self._log(f"⏳ Task={task_id} (Attempt {t_fails}) suspended for {t_cd:.1f}s.")
-                        
-                        def delayed_return(tid, wait_sec, reason):
-                            self.m_stop_event.wait(wait_sec)
-                            self._safe_return_task(tid, reason=reason)
-                            
-                        t = threading.Thread(target=delayed_return, args=(task_id, t_cd, f"Spawn failed: rate limited (fails={t_fails})"), daemon=True)
-                        t.start()
-                        
-                        task_id = None
-                    
+                        self.redis_client.zadd(self.m_tracker_key, {str(task_id): time.time() + t_cd})       
             finally:
                 with self.m_lock: 
                     self.m_pending_order_by_queue[target_routing_key] = max(0, self.m_pending_order_by_queue[target_routing_key] - 1)
                 
                 # 创建/登录失败,调用安全归还函数
-                if not success and task_id is not None:
-                    self._safe_return_task(task_id, reason="Instance spawn/login failed")
-                    
+                if not success and task_id is not None and not is_rate_limited:
+                    self.redis_client.zadd(self.m_tracker_key, {str(task_id): 0})
+                    self._log(f"♻️ Task={task_id} failed normal spawn. Instantly handed over to Sweeper.")
         ThreadPool.getInstance().enqueue(_job)

+ 19 - 38
main_sweeper.py

@@ -1,26 +1,19 @@
+import os
+import json
 import time
 import redis
 import logging
-import json
-import os
 from typing import Dict
 from toolkit.vs_cloud_api import VSCloudApi
 
-# 设置纯净的日志输出格式
 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
 
 class OrphanTaskSweeper:
-    """
-    孤儿任务兜底回收程序 (Orphan Task Sweeper)
-    - 独立运行,不干扰主业务逻辑。
-    - 扫描 Redis 中所有超过指定时间没有被核销的 task_id,将其强制归还给云端队列。
-    """
-    def __init__(self, redis_conf: Dict, timeout_seconds: int = 900):
+    def __init__(self, redis_conf: Dict):
         self.redis_client = redis.Redis(**redis_conf)
-        self.timeout_seconds = timeout_seconds  # 默认 900 秒 (15分钟)
 
     def run_forever(self):
-        logging.info(f"🛡️ Task Sweeper started. Monitoring unreturned tasks (Timeout: {self.timeout_seconds}s)...")
+        logging.info("🛡️ Task Sweeper started. Monitoring dead tasks (Heartbeat timeout)...")
         while True:
             try:
                 self._sweep()
@@ -31,71 +24,59 @@ class OrphanTaskSweeper:
             time.sleep(60)
 
     def _sweep(self):
-        # 查找所有节点的 tracking key (支持多机器集群部署)
+        # 扫描集群中所有 Worker 的 tracking keys
         tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
         if not tracker_keys:
             return
 
         current_time = time.time()
-        # 凡是 score (入栈时间戳) 小于 cutoff_time 的,都是超时未归还的“死单”
-        cutoff_time = current_time - self.timeout_seconds
 
         for key in tracker_keys:
             key_str = key.decode('utf-8') if isinstance(key, bytes) else key
-            orphans = self.redis_client.zrangebyscore(key_str, 0, cutoff_time)
             
-            if not orphans:
+            # 【核心逻辑】只要 Deadline (Score) 小于 当前时间,说明心跳彻底断了
+            dead_tasks = self.redis_client.zrangebyscore(key_str, 0, current_time)
+            
+            if not dead_tasks:
                 continue
 
-            for task_id_bytes in orphans:
+            for task_id_bytes in dead_tasks:
                 task_id = int(task_id_bytes.decode('utf-8'))
                 self._rescue_task(key_str, task_id)
 
     def _rescue_task(self, tracker_key: str, task_id: int):
-        logging.warning(f"⚠️ Found orphan task={task_id} in {tracker_key}. Attempting rescue...")
+        logging.warning(f"⚠️ Heartbeat lost for task={task_id} in {tracker_key}. Attempting rescue...")
         try:
             task_data = VSCloudApi.Instance().get_vas_task(task_id)
             
-            # 1. 云端彻底查不到这个订单了,说明已被物理删除,清理本地记录
             if not task_data:
-                logging.info(f"Task={task_id} does not exist in cloud. Removing from local tracker.")
+                logging.info(f"Task={task_id} missing in cloud. Removing from local tracker.")
                 self.redis_client.zrem(tracker_key, task_id)
                 return
 
             current_status = task_data.get('status', '')
             
-            # 2. 订单状态已经是终态,不需要归还,直接核销本地记录
-            if current_status in['cancelled', 'grabbed', 'success']:
-                logging.info(f"Task={task_id} status is already '{current_status}'. Removing from tracker.")
+            if current_status in ['cancelled', 'pause' ,'grabbed', 'success', 'completed']:
+                logging.info(f"Task={task_id} status '{current_status}'. Removing from tracker.")
                 self.redis_client.zrem(tracker_key, task_id)
                 return
 
-            # 3. 订单依然卡在 processing 状态,强制还给队列
-            logging.info(f"Returning orphan task={task_id} to queue.")
+            # 云端状态还是 processing,证明 Worker 真的死机了,强制归还
+            logging.info(f"Returning dead task={task_id} to queue.")
             VSCloudApi.Instance().return_vas_task_to_queue(task_id)
             
-            # 归还成功后,从 Redis 中移除追踪
             self.redis_client.zrem(tracker_key, task_id)
-            logging.info(f"✅ Orphan task={task_id} successfully rescued and returned.")
+            logging.info(f"✅ Dead task={task_id} successfully rescued and returned.")
 
         except Exception as e:
-            # 遇到网络错误,不删 Redis 记录,让它在下一个 60 秒循环里继续被重试归还
             logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
 
-
 if __name__ == "__main__":
-    # 【注意】这里请替换为你实际项目中加载 config 和 初始化 Cloud API 的代码
-    # 逻辑通常与你的 main_booker.py 顶部的初始化代码一模一样
-    config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") # 根据你的实际格式 (yaml/json)
+    config_path = os.path.join(os.path.dirname(__file__), "config", "config.json")
     
     with open(config_path, 'r') as f:
         config_data = json.load(f)
         
     redis_conf = config_data.get('redis', {})
-    
-    # 如果你的框架需要显式初始化 VSCloudApi,请在这里初始化
-    # VSCloudApi.Instance().init(...) 
-
-    # 启动兜底程序,设置超时阈值为 15 分钟 (900秒)
-    sweeper = OrphanTaskSweeper(redis_conf, timeout_seconds=900)
+    sweeper = OrphanTaskSweeper(redis_conf)
     sweeper.run_forever()

+ 2 - 0
vs_types.py

@@ -204,6 +204,8 @@ class Task(BaseModel):
     # 来源标识(用于配额统计)
     source_queue: str = ""
     successful_bookings: int = 0
+    # 下一次允许心跳时间
+    next_remote_ping: float = 0.0
     
     model_config = {
         "underscore_attrs_are_private": True,