notification_service.py 737 B

1234567891011121314151617181920212223242526
  1. # app/services/product_service.py
  2. import uuid
  3. from sqlalchemy.orm import Session
  4. from typing import Optional, List, Dict
  5. from redis.asyncio import Redis
  6. from app.utils.redis_utils import redis_qpush
  7. class NotificationService:
  8. def create(redis_client: Redis, ntype: str, user_id:str, channels:List[str], template_id=str, payload=Dict):
  9. notification_payload = {
  10. "notification_id": f'nid_{uuid.uuid4().hex}',
  11. "type": ntype,
  12. "user_id": user_id,
  13. "channels": channels,
  14. "template_id": template_id,
  15. "payload": payload
  16. }
  17. redis_qpush(
  18. redis_client,
  19. "vas_notification_queue",
  20. notification_payload
  21. )