import asyncio from typing import Dict, Any from redis.asyncio import Redis from app.services.wechat_service import WechatService from app.services.email_authorizations_service import EmailAuthorizationService from app.utils.redis_utils import redis_qpop async def notification_consumer(redis_client: Redis): """ 异步消费 Redis 队列 vas_notification_queue """ queue_name = "vas_notification_queue" return while True: try: # 阻塞获取队列消息 message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5) if not message: await asyncio.sleep(1) # 队列为空,休眠 continue channels = message.get("channels", []) template_id = message.get("template_id") payload = message.get("payload", {}) user_id = message.get("user_id") # 按渠道发送 if "email" in channels: # EmailService.create(user_id, template_id, payload) 是你自己实现的发送逻辑 await EmailAuthorizationService.send(user_id=user_id, template_id=template_id, payload=payload) if "wechat" in channels: api_token = payload.get("api_token") content = payload.get("message") or payload.get("content") if api_token and content: await WechatService.push_to_wechat({"api_token": api_token, "message": content}) print(f"✅ Notification sent: {message.get('notification_id')}") except Exception as e: print(f"⚠️ Notification consumer error: {e}") await asyncio.sleep(1) # 避免异常循环过快 def template_for_bind_email(payload): template = ''' Email Verification

Verify your email address

Hello,

You requested to bind this email address to your {{app_name}} account. Please use the verification code below to proceed:

{{code}}

This code will expire in {{expiration_time}}.

If you did not request this change, please ignore this email.


Best regards,
The {{app_name}} Team

''' def template_for_reset_pwd(payload): template = ''' Reset Password

Password Reset Request

Hello,

We received a request to reset the password for your {{app_name}} account. Please use the following code to verify your identity:

{{code}}

This code is valid for {{expiration_time}}.

Security Tip: If you did not request a password reset, please ignore this email. No changes will be made to your account.

Best regards,
The {{app_name}} Team

''' def template_for_login_credentials(payload): template = ''' Your Account Details

Welcome to {{app_name}}

Dear User,

Your account has been successfully set up. Below are your temporary login credentials.

Username: {{username}}
Password: {{password}}
Important: For your security, please change your password immediately after logging in.
Log In Now

Or copy this link: {{login_url}}

''' def template_ticket_open(payload): template = ''' Ticket Created

Support Request Received

Hello {{username}},

We wanted to let you know that we've received your request. Our team is currently reviewing the details.

Ticket ID: #{{ticket_id}}
Type: {{ticket_type}}
Time: {{created_at}}

We usually reply within 24 hours. You will receive an email notification when our agent replies.

View Ticket Details
''' def template_confirm_payment(payload): template = { "touser": "ADMIN_USER_ID", "msgtype": "textcard", "agentid": 1000001, "textcard": { "title": "💰 待确认:收到新的手动转账", "description": "
2025-12-31 10:30:00

订单号:ORD-20251231-001
用户:user@example.com
金额:¥ 3,500.00

请核实资金到账后,点击卡片确认收款。", "url": "https://admin.visafly.com/payment/confirm?payment_id=123&token=secure_token_abc", "btntxt": "立即确认" } }