# app/services/notification_outbox_service.py import uuid from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.biz_exception import NotFoundError from app.models.notification_outbox import NotificationOutbox from app.schemas.notification_outbox import NotificationOutboxCreate, NotificationOutboxUpdate from app.utils.pagination import paginate class NotificationOutboxService: @staticmethod async def create( db: AsyncSession, data: NotificationOutboxCreate, ) -> NotificationOutbox: msg_id = data.msg_id or f"mid_{uuid.uuid4().hex}" db_obj = NotificationOutbox( msg_id=msg_id, channel=data.channel, payload=data.payload, priority=data.priority if data.priority is not None else 10, status=data.status or "pending", attempts=data.attempts or 0, next_retry_at=data.next_retry_at, ) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj @staticmethod async def update( db: AsyncSession, id: int, data: NotificationOutboxUpdate, ) -> NotificationOutbox: stmt = select(NotificationOutbox).where(NotificationOutbox.id == id) db_obj = (await db.execute(stmt)).scalar_one_or_none() if not db_obj: raise NotFoundError("Notification outbox not exist") for k, v in data.dict(exclude_unset=True).items(): setattr(db_obj, k, v) await db.commit() await db.refresh(db_obj) return db_obj @staticmethod async def get( db: AsyncSession, id: int, ) -> NotificationOutbox: stmt = select(NotificationOutbox).where(NotificationOutbox.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Notification outbox not exist") return obj @staticmethod async def list( db: AsyncSession, status: Optional[str] = None, channel: Optional[str] = None, priority: Optional[int] = None, msg_id: Optional[str] = None, page: int = 1, size: int = 20, ): stmt = select(NotificationOutbox) if status: stmt = stmt.where(NotificationOutbox.status == status) if channel: stmt = stmt.where(NotificationOutbox.channel == channel) if priority: stmt = stmt.where(NotificationOutbox.priority == priority) if msg_id: stmt = stmt.where(NotificationOutbox.msg_id == msg_id) stmt = stmt.order_by(NotificationOutbox.created_at.desc()) return await paginate(db, stmt, page, size)