| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import os
- import json
- import time
- import redis
- import logging
- from typing import Dict
- from toolkit.vs_cloud_api import VSCloudApi
- logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
- class OrphanTaskSweeper:
- def __init__(self, redis_conf: Dict):
- self.redis_client = redis.Redis(**redis_conf)
- def run_forever(self):
- logging.info("🛡️ Task Sweeper started. Monitoring dead tasks (Heartbeat timeout)...")
- while True:
- try:
- self._sweep()
- except Exception as e:
- logging.error(f"Sweeper loop error: {e}")
-
- time.sleep(3)
- def _sweep(self):
- # 扫描集群中所有 Worker 的 tracking keys
- tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
- if not tracker_keys:
- return
- current_time = time.time()
- for key in tracker_keys:
- key_str = key.decode('utf-8') if isinstance(key, bytes) else key
-
- # 【核心逻辑】只要 Deadline (Score) 小于 当前时间,说明心跳彻底断了
- dead_tasks = self.redis_client.zrangebyscore(key_str, 0, current_time)
-
- if not dead_tasks:
- continue
- for task_id_bytes in dead_tasks:
- task_id = int(task_id_bytes.decode('utf-8'))
- self._rescue_task(key_str, task_id)
- def _rescue_task(self, tracker_key: str, task_id: int):
- logging.warning(f"⚠️ Heartbeat lost for task={task_id} in {tracker_key}. Attempting rescue...")
- try:
- task_data = VSCloudApi.Instance().get_vas_task(task_id)
-
- if not task_data:
- logging.info(f"Task={task_id} missing in cloud. Removing from local tracker.")
- self.redis_client.zrem(tracker_key, task_id)
- return
- current_status = task_data.get('status', '')
-
- if current_status in ['cancelled', 'pause' ,'grabbed', 'success', 'completed']:
- logging.info(f"Task={task_id} status '{current_status}'. Removing from tracker.")
- self.redis_client.zrem(tracker_key, task_id)
- return
- # 云端状态还是 processing,证明 Worker 真的死机了,强制归还
- logging.info(f"Returning dead task={task_id} to queue.")
- VSCloudApi.Instance().return_vas_task_to_queue(task_id)
-
- self.redis_client.zrem(tracker_key, task_id)
- logging.info(f"✅ Dead task={task_id} successfully rescued and returned.")
- except Exception as e:
- logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
- if __name__ == "__main__":
- config_path = os.path.join(os.path.dirname(__file__), "config", "config.json")
-
- with open(config_path, 'r') as f:
- config_data = json.load(f)
-
- redis_conf = config_data.get('redis', {})
- sweeper = OrphanTaskSweeper(redis_conf)
- sweeper.run_forever()
|