notification_service.py 684 B

12345678910111213141516171819202122232425
  1. # app/services/product_service.py
  2. import uuid
  3. from sqlalchemy.orm import Session
  4. from typing import Optional, List, Dict
  5. from redis.asyncio import Redis
  6. class NotificationService:
  7. def create(redis: Redis, ntype: str, user_id:str, channels:List[str], template_id=str, payload=Dict):
  8. notification_payload = {
  9. "notification_id": f'nid_{uuid.uuid4().hex}',
  10. "type": ntype,
  11. "user_id": user_id,
  12. "channels": channels,
  13. "template_id": template_id,
  14. "payload": payload
  15. }
  16. redis_qpush(
  17. redis_client,
  18. "vas_notification_queue",
  19. notification_payload
  20. )