import asyncio import threading from typing import Callable, List, Dict, Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from app.core.logger import logger class ScheduledTask: def __init__(self, name: str, handler: Callable, interval_hours: int = 1): self.name = name self.handler = handler self.interval_hours = interval_hours class SchedulerService: _instance = None _lock = threading.Lock() _scheduler: AsyncIOScheduler = None _tasks: List[ScheduledTask] = [] _started = False @classmethod def get_instance(cls) -> "SchedulerService": if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): self._scheduler = AsyncIOScheduler(timezone="UTC") self._setup_event_listeners() def _setup_event_listeners(self): def on_job_error(event): logger.error(f"[Scheduler] Job {event.job_id} error: {event.exception}") def on_job_executed(event): if event.exception is None: logger.info(f"[Scheduler] Job {event.job_id} completed successfully") else: logger.error(f"[Scheduler] Job {event.job_id} failed: {event.exception}") self._scheduler.add_listener(on_job_error, EVENT_JOB_ERROR) self._scheduler.add_listener(on_job_executed, EVENT_JOB_EXECUTED) def register_task(self, name: str, handler: Callable, interval_hours: int = 1): task = ScheduledTask(name=name, handler=handler, interval_hours=interval_hours) self._tasks.append(task) logger.info(f"[Scheduler] Registered task: {name} (interval: {interval_hours}h)") def start(self): if self._started: logger.warning("[Scheduler] Already started") return for task in self._tasks: self._scheduler.add_job( task.handler, IntervalTrigger(hours=task.interval_hours), id=task.name, replace_existing=True, misfire_grace_time=3600, ) logger.info(f"[Scheduler] Scheduled task: {task.name} every {task.interval_hours}h") self._scheduler.start() self._started = True logger.info("[Scheduler] Started successfully") def stop(self): if self._scheduler.running: self._scheduler.shutdown(wait=False) self._started = False logger.info("[Scheduler] Stopped") def get_jobs(self) -> List[Dict[str, Any]]: jobs = [] for job in self._scheduler.get_jobs(): jobs.append({ "id": job.id, "next_run": str(job.next_run_time) if job.next_run_time else None, "interval_hours": job.trigger.interval_hours if hasattr(job.trigger, 'interval_hours') else None, }) return jobs scheduler_service = SchedulerService.get_instance() def register_scheduled_task(name: str, interval_hours: int = 1): def decorator(func: Callable): scheduler_service.register_task(name, func, interval_hours) return func return decorator