main_sweeper.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import os
  2. import json
  3. import time
  4. import redis
  5. import logging
  6. from typing import Dict
  7. from toolkit.vs_cloud_api import VSCloudApi
  8. logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [SWEEPER] %(message)s')
  9. class OrphanTaskSweeper:
  10. def __init__(self, redis_conf: Dict):
  11. self.redis_client = redis.Redis(**redis_conf)
  12. def run_forever(self):
  13. logging.info("🛡️ Task Sweeper started. Monitoring dead tasks (Heartbeat timeout)...")
  14. while True:
  15. try:
  16. self._sweep()
  17. except Exception as e:
  18. logging.error(f"Sweeper loop error: {e}")
  19. # 每隔 60 秒巡检一次全局 Redis
  20. time.sleep(60)
  21. def _sweep(self):
  22. # 扫描集群中所有 Worker 的 tracking keys
  23. tracker_keys = self.redis_client.keys("vs:worker:tasks_tracker:*")
  24. if not tracker_keys:
  25. return
  26. current_time = time.time()
  27. for key in tracker_keys:
  28. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  29. # 【核心逻辑】只要 Deadline (Score) 小于 当前时间,说明心跳彻底断了
  30. dead_tasks = self.redis_client.zrangebyscore(key_str, 0, current_time)
  31. if not dead_tasks:
  32. continue
  33. for task_id_bytes in dead_tasks:
  34. task_id = int(task_id_bytes.decode('utf-8'))
  35. self._rescue_task(key_str, task_id)
  36. def _rescue_task(self, tracker_key: str, task_id: int):
  37. logging.warning(f"⚠️ Heartbeat lost for task={task_id} in {tracker_key}. Attempting rescue...")
  38. try:
  39. task_data = VSCloudApi.Instance().get_vas_task(task_id)
  40. if not task_data:
  41. logging.info(f"Task={task_id} missing in cloud. Removing from local tracker.")
  42. self.redis_client.zrem(tracker_key, task_id)
  43. return
  44. current_status = task_data.get('status', '')
  45. if current_status in ['cancelled', 'pause' ,'grabbed', 'success', 'completed']:
  46. logging.info(f"Task={task_id} status '{current_status}'. Removing from tracker.")
  47. self.redis_client.zrem(tracker_key, task_id)
  48. return
  49. # 云端状态还是 processing,证明 Worker 真的死机了,强制归还
  50. logging.info(f"Returning dead task={task_id} to queue.")
  51. VSCloudApi.Instance().return_vas_task_to_queue(task_id)
  52. self.redis_client.zrem(tracker_key, task_id)
  53. logging.info(f"✅ Dead task={task_id} successfully rescued and returned.")
  54. except Exception as e:
  55. logging.error(f"Failed to rescue task={task_id}: {e}. Will retry next minute.")
  56. if __name__ == "__main__":
  57. config_path = os.path.join(os.path.dirname(__file__), "config", "config.json")
  58. with open(config_path, 'r') as f:
  59. config_data = json.load(f)
  60. redis_conf = config_data.get('redis', {})
  61. sweeper = OrphanTaskSweeper(redis_conf)
  62. sweeper.run_forever()