notification_outbox.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import json
  2. from datetime import datetime
  3. from typing import Optional, Any, Dict
  4. from pydantic import BaseModel, field_validator
  5. class NotificationOutboxBase(BaseModel):
  6. msg_id: Optional[str] = None
  7. channel: Optional[str] = None
  8. payload: Optional[Dict[str, Any]] = None
  9. priority: Optional[int] = 10
  10. status: Optional[str] = "pending"
  11. attempts: Optional[int] = 0
  12. next_retry_at: Optional[datetime] = None
  13. @field_validator("payload", mode="before")
  14. def normalize_json_field(cls, v):
  15. if v is None:
  16. return None
  17. if isinstance(v, str):
  18. try:
  19. return json.loads(v)
  20. except Exception:
  21. return {}
  22. return v
  23. class NotificationOutboxCreate(NotificationOutboxBase):
  24. channel: str
  25. payload: Dict[str, Any]
  26. class NotificationOutboxUpdate(BaseModel):
  27. channel: Optional[str] = None
  28. payload: Optional[Dict[str, Any]] = None
  29. priority: Optional[int] = None
  30. status: Optional[str] = None
  31. attempts: Optional[int] = None
  32. next_retry_at: Optional[datetime] = None
  33. @field_validator("payload", mode="before")
  34. def normalize_json_field(cls, v):
  35. if v is None:
  36. return None
  37. if isinstance(v, str):
  38. try:
  39. return json.loads(v)
  40. except Exception:
  41. return {}
  42. return v
  43. class NotificationOutboxOut(NotificationOutboxBase):
  44. id: int
  45. created_at: datetime
  46. updated_at: datetime
  47. model_config = {
  48. "from_attributes": True
  49. }