import os import json import time import redis import logging from typing import Dict from toolkit.vs_cloud_api import VSCloudApi logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s') class OrphanTaskSweeper: def __init__(self, redis_conf: Dict): self.redis_client = redis.Redis(**redis_conf) def run_forever(self): logging.info("🛡️ Task Sweeper started. Monitoring dead tasks (Heartbeat timeout)...") while True: try: self._sweep() except Exception as e: logging.error(f"Sweeper loop error: {e}") time.sleep(3) def _sweep(self): # 扫描集群中所有 Worker 的 tracking keys tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*") if not tracker_keys: return current_time = time.time() for key in tracker_keys: key_str = key.decode('utf-8') if isinstance(key, bytes) else key # 【核心逻辑】只要 Deadline (Score) 小于 当前时间,说明心跳彻底断了 dead_tasks = self.redis_client.zrangebyscore(key_str, 0, current_time) if not dead_tasks: continue for task_id_bytes in dead_tasks: task_id = int(task_id_bytes.decode('utf-8')) self._rescue_task(key_str, task_id) def _rescue_task(self, tracker_key: str, task_id: int): logging.warning(f"⚠️ Heartbeat lost for task={task_id} in {tracker_key}. Attempting rescue...") try: task_data = VSCloudApi.Instance().get_vas_task(task_id) if not task_data: logging.info(f"Task={task_id} missing in cloud. Removing from local tracker.") self.redis_client.zrem(tracker_key, task_id) return current_status = task_data.get('status', '') if current_status in ['cancelled', 'pause' ,'grabbed', 'success', 'completed']: logging.info(f"Task={task_id} status '{current_status}'. Removing from tracker.") self.redis_client.zrem(tracker_key, task_id) return # 云端状态还是 processing,证明 Worker 真的死机了,强制归还 logging.info(f"Returning dead task={task_id} to queue.") VSCloudApi.Instance().return_vas_task_to_queue(task_id) self.redis_client.zrem(tracker_key, task_id) logging.info(f"✅ Dead task={task_id} successfully rescued and returned.") except Exception as e: logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.") if __name__ == "__main__": config_path = os.path.join(os.path.dirname(__file__), "config", "config.json") with open(config_path, 'r') as f: config_data = json.load(f) redis_conf = config_data.get('redis', {}) sweeper = OrphanTaskSweeper(redis_conf) sweeper.run_forever()