notification_outbox_service.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # app/services/notification_outbox_service.py
  2. import uuid
  3. from typing import Optional
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from sqlalchemy import select
  6. from app.core.biz_exception import NotFoundError
  7. from app.models.notification_outbox import NotificationOutbox
  8. from app.schemas.notification_outbox import NotificationOutboxCreate, NotificationOutboxUpdate
  9. from app.utils.pagination import paginate
  10. class NotificationOutboxService:
  11. @staticmethod
  12. async def create(
  13. db: AsyncSession,
  14. data: NotificationOutboxCreate,
  15. ) -> NotificationOutbox:
  16. msg_id = data.msg_id or f"mid_{uuid.uuid4().hex}"
  17. db_obj = NotificationOutbox(
  18. msg_id=msg_id,
  19. channel=data.channel,
  20. payload=data.payload,
  21. priority=data.priority if data.priority is not None else 10,
  22. status=data.status or "pending",
  23. attempts=data.attempts or 0,
  24. next_retry_at=data.next_retry_at,
  25. )
  26. db.add(db_obj)
  27. await db.commit()
  28. await db.refresh(db_obj)
  29. return db_obj
  30. @staticmethod
  31. async def update(
  32. db: AsyncSession,
  33. id: int,
  34. data: NotificationOutboxUpdate,
  35. ) -> NotificationOutbox:
  36. stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
  37. db_obj = (await db.execute(stmt)).scalar_one_or_none()
  38. if not db_obj:
  39. raise NotFoundError("Notification outbox not exist")
  40. for k, v in data.dict(exclude_unset=True).items():
  41. setattr(db_obj, k, v)
  42. await db.commit()
  43. await db.refresh(db_obj)
  44. return db_obj
  45. @staticmethod
  46. async def get(
  47. db: AsyncSession,
  48. id: int,
  49. ) -> NotificationOutbox:
  50. stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
  51. obj = (await db.execute(stmt)).scalar_one_or_none()
  52. if not obj:
  53. raise NotFoundError("Notification outbox not exist")
  54. return obj
  55. @staticmethod
  56. async def list(
  57. db: AsyncSession,
  58. status: Optional[str] = None,
  59. channel: Optional[str] = None,
  60. priority: Optional[int] = None,
  61. msg_id: Optional[str] = None,
  62. page: int = 1,
  63. size: int = 20,
  64. ):
  65. stmt = select(NotificationOutbox)
  66. if status:
  67. stmt = stmt.where(NotificationOutbox.status == status)
  68. if channel:
  69. stmt = stmt.where(NotificationOutbox.channel == channel)
  70. if priority:
  71. stmt = stmt.where(NotificationOutbox.priority == priority)
  72. if msg_id:
  73. stmt = stmt.where(NotificationOutbox.msg_id == msg_id)
  74. stmt = stmt.order_by(NotificationOutbox.created_at.desc())
  75. return await paginate(db, stmt, page, size)