main_sweeper.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import time
  2. import redis
  3. import logging
  4. import json
  5. import os
  6. from typing import Dict
  7. from toolkit.vs_cloud_api import VSCloudApi
  8. # 设置纯净的日志输出格式
  9. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
  10. class OrphanTaskSweeper:
  11. """
  12. 孤儿任务兜底回收程序 (Orphan Task Sweeper)
  13. - 独立运行,不干扰主业务逻辑。
  14. - 扫描 Redis 中所有超过指定时间没有被核销的 task_id,将其强制归还给云端队列。
  15. """
  16. def __init__(self, redis_conf: Dict, timeout_seconds: int = 900):
  17. self.redis_client = redis.Redis(**redis_conf)
  18. self.timeout_seconds = timeout_seconds # 默认 900 秒 (15分钟)
  19. def run_forever(self):
  20. logging.info(f"🛡️ Task Sweeper started. Monitoring unreturned tasks (Timeout: {self.timeout_seconds}s)...")
  21. while True:
  22. try:
  23. self._sweep()
  24. except Exception as e:
  25. logging.error(f"Sweeper loop error: {e}")
  26. # 每隔 60 秒巡检一次全局 Redis
  27. time.sleep(60)
  28. def _sweep(self):
  29. # 查找所有节点的 tracking key (支持多机器集群部署)
  30. tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
  31. if not tracker_keys:
  32. return
  33. current_time = time.time()
  34. # 凡是 score (入栈时间戳) 小于 cutoff_time 的,都是超时未归还的“死单”
  35. cutoff_time = current_time - self.timeout_seconds
  36. for key in tracker_keys:
  37. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  38. orphans = self.redis_client.zrangebyscore(key_str, 0, cutoff_time)
  39. if not orphans:
  40. continue
  41. for task_id_bytes in orphans:
  42. task_id = int(task_id_bytes.decode('utf-8'))
  43. self._rescue_task(key_str, task_id)
  44. def _rescue_task(self, tracker_key: str, task_id: int):
  45. logging.warning(f"⚠️ Found orphan task={task_id} in {tracker_key}. Attempting rescue...")
  46. try:
  47. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  48. # 1. 云端彻底查不到这个订单了,说明已被物理删除,清理本地记录
  49. if not task_data:
  50. logging.info(f"Task={task_id} does not exist in cloud. Removing from local tracker.")
  51. self.redis_client.zrem(tracker_key, task_id)
  52. return
  53. current_status = task_data.get('status', '')
  54. # 2. 订单状态已经是终态,不需要归还,直接核销本地记录
  55. if current_status in['cancelled', 'grabbed', 'success']:
  56. logging.info(f"Task={task_id} status is already '{current_status}'. Removing from tracker.")
  57. self.redis_client.zrem(tracker_key, task_id)
  58. return
  59. # 3. 订单依然卡在 processing 状态,强制还给队列
  60. logging.info(f"Returning orphan task={task_id} to queue.")
  61. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  62. # 归还成功后,从 Redis 中移除追踪
  63. self.redis_client.zrem(tracker_key, task_id)
  64. logging.info(f"✅ Orphan task={task_id} successfully rescued and returned.")
  65. except Exception as e:
  66. # 遇到网络错误,不删 Redis 记录,让它在下一个 60 秒循环里继续被重试归还
  67. logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
  68. if __name__ == "__main__":
  69. # 【注意】这里请替换为你实际项目中加载 config 和 初始化 Cloud API 的代码
  70. # 逻辑通常与你的 main_booker.py 顶部的初始化代码一模一样
  71. config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") # 根据你的实际格式 (yaml/json)
  72. with open(config_path, 'r') as f:
  73. config_data = json.load(f)
  74. redis_conf = config_data.get('redis', {})
  75. # 如果你的框架需要显式初始化 VSCloudApi,请在这里初始化
  76. # VSCloudApi.Instance().init(...)
  77. # 启动兜底程序,设置超时阈值为 15 分钟 (900秒)
  78. sweeper = OrphanTaskSweeper(redis_conf, timeout_seconds=900)
  79. sweeper.run_forever()