notification_service.py 827 B

123456789101112131415161718192021222324252627282930313233343536
  1. # app/services/notification_service.py
  2. import uuid
  3. from typing import List, Dict, Any
  4. from redis.asyncio import Redis
  5. from app.utils.redis_utils import redis_qpush, redis_qpop
  6. class NotificationService:
  7. @staticmethod
  8. async def create(
  9. redis_client: Redis,
  10. ntype: str,
  11. user_id: str,
  12. channels: List[str],
  13. template_id: str,
  14. payload: Dict[str, Any]
  15. ) -> None:
  16. notification_payload = {
  17. "notification_id": f"nid_{uuid.uuid4().hex}",
  18. "type": ntype,
  19. "user_id": user_id,
  20. "channels": channels,
  21. "template_id": template_id,
  22. "payload": payload
  23. }
  24. await redis_qpush(
  25. redis_client,
  26. "vas_notification_queue",
  27. notification_payload
  28. )