| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import time
- import redis
- import logging
- import json
- import os
- from typing import Dict
- from toolkit.vs_cloud_api import VSCloudApi
- # 设置纯净的日志输出格式
- logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
- class OrphanTaskSweeper:
- """
- 孤儿任务兜底回收程序 (Orphan Task Sweeper)
- - 独立运行,不干扰主业务逻辑。
- - 扫描 Redis 中所有超过指定时间没有被核销的 task_id,将其强制归还给云端队列。
- """
- def __init__(self, redis_conf: Dict, timeout_seconds: int = 900):
- self.redis_client = redis.Redis(**redis_conf)
- self.timeout_seconds = timeout_seconds # 默认 900 秒 (15分钟)
- def run_forever(self):
- logging.info(f"🛡️ Task Sweeper started. Monitoring unreturned tasks (Timeout: {self.timeout_seconds}s)...")
- while True:
- try:
- self._sweep()
- except Exception as e:
- logging.error(f"Sweeper loop error: {e}")
-
- # 每隔 60 秒巡检一次全局 Redis
- time.sleep(60)
- def _sweep(self):
- # 查找所有节点的 tracking key (支持多机器集群部署)
- tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
- if not tracker_keys:
- return
- current_time = time.time()
- # 凡是 score (入栈时间戳) 小于 cutoff_time 的,都是超时未归还的“死单”
- cutoff_time = current_time - self.timeout_seconds
- for key in tracker_keys:
- key_str = key.decode('utf-8') if isinstance(key, bytes) else key
- orphans = self.redis_client.zrangebyscore(key_str, 0, cutoff_time)
-
- if not orphans:
- continue
- for task_id_bytes in orphans:
- task_id = int(task_id_bytes.decode('utf-8'))
- self._rescue_task(key_str, task_id)
- def _rescue_task(self, tracker_key: str, task_id: int):
- logging.warning(f"⚠️ Found orphan task={task_id} in {tracker_key}. Attempting rescue...")
- try:
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
-
- # 1. 云端彻底查不到这个订单了,说明已被物理删除,清理本地记录
- if not task_data:
- logging.info(f"Task={task_id} does not exist in cloud. Removing from local tracker.")
- self.redis_client.zrem(tracker_key, task_id)
- return
- current_status = task_data.get('status', '')
-
- # 2. 订单状态已经是终态,不需要归还,直接核销本地记录
- if current_status in['cancelled', 'grabbed', 'success']:
- logging.info(f"Task={task_id} status is already '{current_status}'. Removing from tracker.")
- self.redis_client.zrem(tracker_key, task_id)
- return
- # 3. 订单依然卡在 processing 状态,强制还给队列
- logging.info(f"Returning orphan task={task_id} to queue.")
- VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-
- # 归还成功后,从 Redis 中移除追踪
- self.redis_client.zrem(tracker_key, task_id)
- logging.info(f"✅ Orphan task={task_id} successfully rescued and returned.")
- except Exception as e:
- # 遇到网络错误,不删 Redis 记录,让它在下一个 60 秒循环里继续被重试归还
- logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
- if __name__ == "__main__":
- # 【注意】这里请替换为你实际项目中加载 config 和 初始化 Cloud API 的代码
- # 逻辑通常与你的 main_booker.py 顶部的初始化代码一模一样
- config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") # 根据你的实际格式 (yaml/json)
-
- with open(config_path, 'r') as f:
- config_data = json.load(f)
-
- redis_conf = config_data.get('redis', {})
-
- # 如果你的框架需要显式初始化 VSCloudApi,请在这里初始化
- # VSCloudApi.Instance().init(...)
- # 启动兜底程序,设置超时阈值为 15 分钟 (900秒)
- sweeper = OrphanTaskSweeper(redis_conf, timeout_seconds=900)
- sweeper.run_forever()
|