| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- # app/services/notification_outbox_service.py
- import uuid
- from typing import Optional
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.core.biz_exception import NotFoundError
- from app.models.notification_outbox import NotificationOutbox
- from app.schemas.notification_outbox import NotificationOutboxCreate, NotificationOutboxUpdate
- from app.utils.pagination import paginate
- class NotificationOutboxService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: NotificationOutboxCreate,
- ) -> NotificationOutbox:
- msg_id = data.msg_id or f"mid_{uuid.uuid4().hex}"
- db_obj = NotificationOutbox(
- msg_id=msg_id,
- channel=data.channel,
- payload=data.payload,
- priority=data.priority if data.priority is not None else 10,
- status=data.status or "pending",
- attempts=data.attempts or 0,
- next_retry_at=data.next_retry_at,
- )
- db.add(db_obj)
- await db.commit()
- await db.refresh(db_obj)
- return db_obj
- @staticmethod
- async def update(
- db: AsyncSession,
- id: int,
- data: NotificationOutboxUpdate,
- ) -> NotificationOutbox:
- stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
- db_obj = (await db.execute(stmt)).scalar_one_or_none()
- if not db_obj:
- raise NotFoundError("Notification outbox not exist")
- for k, v in data.dict(exclude_unset=True).items():
- setattr(db_obj, k, v)
- await db.commit()
- await db.refresh(db_obj)
- return db_obj
- @staticmethod
- async def get(
- db: AsyncSession,
- id: int,
- ) -> NotificationOutbox:
- stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Notification outbox not exist")
- return obj
- @staticmethod
- async def list(
- db: AsyncSession,
- status: Optional[str] = None,
- channel: Optional[str] = None,
- priority: Optional[int] = None,
- msg_id: Optional[str] = None,
- page: int = 1,
- size: int = 20,
- ):
- stmt = select(NotificationOutbox)
- if status:
- stmt = stmt.where(NotificationOutbox.status == status)
- if channel:
- stmt = stmt.where(NotificationOutbox.channel == channel)
- if priority:
- stmt = stmt.where(NotificationOutbox.priority == priority)
- if msg_id:
- stmt = stmt.where(NotificationOutbox.msg_id == msg_id)
- stmt = stmt.order_by(NotificationOutbox.created_at.desc())
- return await paginate(db, stmt, page, size)
|