import time import redis import logging import json import os from typing import Dict from toolkit.vs_cloud_api import VSCloudApi # 设置纯净的日志输出格式 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s') class OrphanTaskSweeper: """ 孤儿任务兜底回收程序 (Orphan Task Sweeper) - 独立运行,不干扰主业务逻辑。 - 扫描 Redis 中所有超过指定时间没有被核销的 task_id,将其强制归还给云端队列。 """ def __init__(self, redis_conf: Dict, timeout_seconds: int = 900): self.redis_client = redis.Redis(**redis_conf) self.timeout_seconds = timeout_seconds # 默认 900 秒 (15分钟) def run_forever(self): logging.info(f"🛡️ Task Sweeper started. Monitoring unreturned tasks (Timeout: {self.timeout_seconds}s)...") while True: try: self._sweep() except Exception as e: logging.error(f"Sweeper loop error: {e}") # 每隔 60 秒巡检一次全局 Redis time.sleep(60) def _sweep(self): # 查找所有节点的 tracking key (支持多机器集群部署) tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*") if not tracker_keys: return current_time = time.time() # 凡是 score (入栈时间戳) 小于 cutoff_time 的,都是超时未归还的“死单” cutoff_time = current_time - self.timeout_seconds for key in tracker_keys: key_str = key.decode('utf-8') if isinstance(key, bytes) else key orphans = self.redis_client.zrangebyscore(key_str, 0, cutoff_time) if not orphans: continue for task_id_bytes in orphans: task_id = int(task_id_bytes.decode('utf-8')) self._rescue_task(key_str, task_id) def _rescue_task(self, tracker_key: str, task_id: int): logging.warning(f"⚠️ Found orphan task={task_id} in {tracker_key}. Attempting rescue...") try: task_data = VSCloudApi.Instance().get_vas_task(task_id) # 1. 云端彻底查不到这个订单了,说明已被物理删除,清理本地记录 if not task_data: logging.info(f"Task={task_id} does not exist in cloud. Removing from local tracker.") self.redis_client.zrem(tracker_key, task_id) return current_status = task_data.get('status', '') # 2. 订单状态已经是终态,不需要归还,直接核销本地记录 if current_status in['cancelled', 'grabbed', 'success']: logging.info(f"Task={task_id} status is already '{current_status}'. Removing from tracker.") self.redis_client.zrem(tracker_key, task_id) return # 3. 订单依然卡在 processing 状态,强制还给队列 logging.info(f"Returning orphan task={task_id} to queue.") VSCloudApi.Instance().return_vas_task_to_queue(task_id) # 归还成功后,从 Redis 中移除追踪 self.redis_client.zrem(tracker_key, task_id) logging.info(f"✅ Orphan task={task_id} successfully rescued and returned.") except Exception as e: # 遇到网络错误,不删 Redis 记录,让它在下一个 60 秒循环里继续被重试归还 logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.") if __name__ == "__main__": # 【注意】这里请替换为你实际项目中加载 config 和 初始化 Cloud API 的代码 # 逻辑通常与你的 main_booker.py 顶部的初始化代码一模一样 config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") # 根据你的实际格式 (yaml/json) with open(config_path, 'r') as f: config_data = json.load(f) redis_conf = config_data.get('redis', {}) # 如果你的框架需要显式初始化 VSCloudApi,请在这里初始化 # VSCloudApi.Instance().init(...) # 启动兜底程序,设置超时阈值为 15 分钟 (900秒) sweeper = OrphanTaskSweeper(redis_conf, timeout_seconds=900) sweeper.run_forever()