| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import asyncio
- import threading
- from typing import Callable, List, Dict, Any
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
- from app.core.logger import logger
- class ScheduledTask:
- def __init__(self, name: str, handler: Callable, interval_hours: int = 1):
- self.name = name
- self.handler = handler
- self.interval_hours = interval_hours
- class SchedulerService:
- _instance = None
- _lock = threading.Lock()
- _scheduler: AsyncIOScheduler = None
- _tasks: List[ScheduledTask] = []
- _started = False
- @classmethod
- def get_instance(cls) -> "SchedulerService":
- if cls._instance is None:
- with cls._lock:
- if cls._instance is None:
- cls._instance = cls()
- return cls._instance
- def __init__(self):
- self._scheduler = AsyncIOScheduler(timezone="UTC")
- self._setup_event_listeners()
- def _setup_event_listeners(self):
- def on_job_error(event):
- logger.error(f"[Scheduler] Job {event.job_id} error: {event.exception}")
- def on_job_executed(event):
- if event.exception is None:
- logger.info(f"[Scheduler] Job {event.job_id} completed successfully")
- else:
- logger.error(f"[Scheduler] Job {event.job_id} failed: {event.exception}")
- self._scheduler.add_listener(on_job_error, EVENT_JOB_ERROR)
- self._scheduler.add_listener(on_job_executed, EVENT_JOB_EXECUTED)
- def register_task(self, name: str, handler: Callable, interval_hours: int = 1):
- task = ScheduledTask(name=name, handler=handler, interval_hours=interval_hours)
- self._tasks.append(task)
- logger.info(f"[Scheduler] Registered task: {name} (interval: {interval_hours}h)")
- def start(self):
- if self._started:
- logger.warning("[Scheduler] Already started")
- return
- for task in self._tasks:
- self._scheduler.add_job(
- task.handler,
- IntervalTrigger(hours=task.interval_hours),
- id=task.name,
- replace_existing=True,
- misfire_grace_time=3600,
- )
- logger.info(f"[Scheduler] Scheduled task: {task.name} every {task.interval_hours}h")
- self._scheduler.start()
- self._started = True
- logger.info("[Scheduler] Started successfully")
- def stop(self):
- if self._scheduler.running:
- self._scheduler.shutdown(wait=False)
- self._started = False
- logger.info("[Scheduler] Stopped")
- def get_jobs(self) -> List[Dict[str, Any]]:
- jobs = []
- for job in self._scheduler.get_jobs():
- jobs.append({
- "id": job.id,
- "next_run": str(job.next_run_time) if job.next_run_time else None,
- "interval_hours": job.trigger.interval_hours if hasattr(job.trigger, 'interval_hours') else None,
- })
- return jobs
- scheduler_service = SchedulerService.get_instance()
- def register_scheduled_task(name: str, interval_hours: int = 1):
- def decorator(func: Callable):
- scheduler_service.register_task(name, func, interval_hours)
- return func
- return decorator
|