notification_service.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. # app/services/notification_service.py
  2. import uuid
  3. from typing import List, Dict, Any
  4. from redis.asyncio import Redis
  5. from app.utils.redis_utils import redis_qpush, redis_qpop
  6. class NotificationService:
  7. @staticmethod
  8. async def post_email(
  9. redis_client: Redis,
  10. receiver: str,
  11. template_id: str,
  12. payload: Dict[str, Any]
  13. ):
  14. notification_payload = {
  15. "notification_id": f"nid_{uuid.uuid4().hex}",
  16. "channel": "email",
  17. "template_id": template_id,
  18. "receiver": receiver,
  19. "payload": payload
  20. }
  21. await redis_qpush(
  22. redis_client,
  23. "vas_notification_queue",
  24. notification_payload
  25. )
  26. @staticmethod
  27. async def post_wechat(
  28. redis_client: Redis,
  29. template_id: str,
  30. payload: Dict[str, Any]
  31. ):
  32. notification_payload = {
  33. "notification_id": f"nid_{uuid.uuid4().hex}",
  34. "channel": "wechat",
  35. "template_id": template_id,
  36. "payload": payload
  37. }
  38. await redis_qpush(
  39. redis_client,
  40. "vas_notification_queue",
  41. notification_payload
  42. )