| 1234567891011121314151617181920212223242526272829303132333435363738 |
- # app/services/notification_service.py
- from typing import Dict, Any, Optional
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.schemas.notification_outbox import NotificationOutboxCreate
- from app.services.notification_outbox_service import NotificationOutboxService
- class NotificationService:
- @staticmethod
- async def post_message(
- db: AsyncSession,
- channel: str,
- payload: Dict[str, Any],
- priority: int = 10,
- msg_id: Optional[str] = None,
- status: str = "pending",
- attempts: int = 0,
- next_retry_at=None,
- ):
- await NotificationOutboxService.create(
- db,
- NotificationOutboxCreate(
- msg_id=msg_id,
- channel=channel,
- payload=payload,
- priority=priority,
- status=status,
- attempts=attempts,
- next_retry_at=next_retry_at,
- ),
- )
-
|