scheduler_service.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import asyncio
  2. import threading
  3. from typing import Callable, List, Dict, Any
  4. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  5. from apscheduler.triggers.interval import IntervalTrigger
  6. from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
  7. from app.core.logger import logger
  8. class ScheduledTask:
  9. def __init__(self, name: str, handler: Callable, interval_hours: int = 1):
  10. self.name = name
  11. self.handler = handler
  12. self.interval_hours = interval_hours
  13. class SchedulerService:
  14. _instance = None
  15. _lock = threading.Lock()
  16. _scheduler: AsyncIOScheduler = None
  17. _tasks: List[ScheduledTask] = []
  18. _started = False
  19. @classmethod
  20. def get_instance(cls) -> "SchedulerService":
  21. if cls._instance is None:
  22. with cls._lock:
  23. if cls._instance is None:
  24. cls._instance = cls()
  25. return cls._instance
  26. def __init__(self):
  27. self._scheduler = AsyncIOScheduler(timezone="UTC")
  28. self._setup_event_listeners()
  29. def _setup_event_listeners(self):
  30. def on_job_error(event):
  31. logger.error(f"[Scheduler] Job {event.job_id} error: {event.exception}")
  32. def on_job_executed(event):
  33. if event.exception is None:
  34. logger.info(f"[Scheduler] Job {event.job_id} completed successfully")
  35. else:
  36. logger.error(f"[Scheduler] Job {event.job_id} failed: {event.exception}")
  37. self._scheduler.add_listener(on_job_error, EVENT_JOB_ERROR)
  38. self._scheduler.add_listener(on_job_executed, EVENT_JOB_EXECUTED)
  39. def register_task(self, name: str, handler: Callable, interval_hours: int = 1):
  40. task = ScheduledTask(name=name, handler=handler, interval_hours=interval_hours)
  41. self._tasks.append(task)
  42. logger.info(f"[Scheduler] Registered task: {name} (interval: {interval_hours}h)")
  43. def start(self):
  44. if self._started:
  45. logger.warning("[Scheduler] Already started")
  46. return
  47. for task in self._tasks:
  48. self._scheduler.add_job(
  49. task.handler,
  50. IntervalTrigger(hours=task.interval_hours),
  51. id=task.name,
  52. replace_existing=True,
  53. misfire_grace_time=3600,
  54. )
  55. logger.info(f"[Scheduler] Scheduled task: {task.name} every {task.interval_hours}h")
  56. self._scheduler.start()
  57. self._started = True
  58. logger.info("[Scheduler] Started successfully")
  59. def stop(self):
  60. if self._scheduler.running:
  61. self._scheduler.shutdown(wait=False)
  62. self._started = False
  63. logger.info("[Scheduler] Stopped")
  64. def get_jobs(self) -> List[Dict[str, Any]]:
  65. jobs = []
  66. for job in self._scheduler.get_jobs():
  67. jobs.append({
  68. "id": job.id,
  69. "next_run": str(job.next_run_time) if job.next_run_time else None,
  70. "interval_hours": job.trigger.interval_hours if hasattr(job.trigger, 'interval_hours') else None,
  71. })
  72. return jobs
  73. scheduler_service = SchedulerService.get_instance()
  74. def register_scheduled_task(name: str, interval_hours: int = 1):
  75. def decorator(func: Callable):
  76. scheduler_service.register_task(name, func, interval_hours)
  77. return func
  78. return decorator