main_sweeper.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import os
  2. import json
  3. import time
  4. import redis
  5. import logging
  6. from typing import Dict
  7. from toolkit.vs_cloud_api import VSCloudApi
  8. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
  9. class OrphanTaskSweeper:
  10. def __init__(self, redis_conf: Dict):
  11. self.redis_client = redis.Redis(**redis_conf)
  12. def run_forever(self):
  13. logging.info("🛡️ Task Sweeper started. Monitoring dead tasks (Heartbeat timeout)...")
  14. while True:
  15. try:
  16. self._sweep()
  17. except Exception as e:
  18. logging.error(f"Sweeper loop error: {e}")
  19. time.sleep(3)
  20. def _sweep(self):
  21. # 扫描集群中所有 Worker 的 tracking keys
  22. tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
  23. if not tracker_keys:
  24. return
  25. current_time = time.time()
  26. for key in tracker_keys:
  27. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  28. # 【核心逻辑】只要 Deadline (Score) 小于 当前时间,说明心跳彻底断了
  29. dead_tasks = self.redis_client.zrangebyscore(key_str, 0, current_time)
  30. if not dead_tasks:
  31. continue
  32. for task_id_bytes in dead_tasks:
  33. task_id = int(task_id_bytes.decode('utf-8'))
  34. self._rescue_task(key_str, task_id)
  35. def _rescue_task(self, tracker_key: str, task_id: int):
  36. logging.warning(f"⚠️ Heartbeat lost for task={task_id} in {tracker_key}. Attempting rescue...")
  37. try:
  38. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  39. if not task_data:
  40. logging.info(f"Task={task_id} missing in cloud. Removing from local tracker.")
  41. self.redis_client.zrem(tracker_key, task_id)
  42. return
  43. current_status = task_data.get('status', '')
  44. if current_status in ['cancelled', 'pause' ,'grabbed', 'success', 'completed']:
  45. logging.info(f"Task={task_id} status '{current_status}'. Removing from tracker.")
  46. self.redis_client.zrem(tracker_key, task_id)
  47. return
  48. # 云端状态还是 processing,证明 Worker 真的死机了,强制归还
  49. logging.info(f"Returning dead task={task_id} to queue.")
  50. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  51. self.redis_client.zrem(tracker_key, task_id)
  52. logging.info(f"✅ Dead task={task_id} successfully rescued and returned.")
  53. except Exception as e:
  54. logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
  55. if __name__ == "__main__":
  56. config_path = os.path.join(os.path.dirname(__file__), "config", "config.json")
  57. with open(config_path, 'r') as f:
  58. config_data = json.load(f)
  59. redis_conf = config_data.get('redis', {})
  60. sweeper = OrphanTaskSweeper(redis_conf)
  61. sweeper.run_forever()