notification_service.py 972 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. # app/services/notification_service.py
  2. from typing import Dict, Any, Optional
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.schemas.notification_outbox import NotificationOutboxCreate
  5. from app.services.notification_outbox_service import NotificationOutboxService
  6. class NotificationService:
  7. @staticmethod
  8. async def post_message(
  9. db: AsyncSession,
  10. channel: str,
  11. payload: Dict[str, Any],
  12. priority: int = 10,
  13. msg_id: Optional[str] = None,
  14. status: str = "pending",
  15. attempts: int = 0,
  16. next_retry_at=None,
  17. ):
  18. await NotificationOutboxService.create(
  19. db,
  20. NotificationOutboxCreate(
  21. msg_id=msg_id,
  22. channel=channel,
  23. payload=payload,
  24. priority=priority,
  25. status=status,
  26. attempts=attempts,
  27. next_retry_at=next_retry_at,
  28. ),
  29. )