Hello {{username}},
Great news! We have successfully secured your appointment slot.
If you don't see it in your Inbox, please check your Spam/Junk folder immediately.
import asyncio import json from datetime import datetime from typing import Dict, Any from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from app.services.wechat_service import WechatService from app.services.email_authorizations_service import EmailAuthorizationService from app.utils.redis_utils import redis_qpop THROTTLE_EXPIRY = 1800 async def notification_consumer(session_factory, redis_client: Redis): """ 异步消费 Redis 队列 vas_notification_queue """ queue_name = "vas_notification_queue" while True: try: # 阻塞获取队列消息 message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5) if not message: await asyncio.sleep(1) # 队列为空,休眠 continue channel = message.get("channel", "") template_id = message.get("template_id") payload = message.get("payload", {}) # 按渠道发送 if "email" == channel: content = None sender = None subject = None receiver = message.get("receiver", "") if "email_verification_for_bind" == template_id: sender = "donotreply@visafly.top" subject = "Email Verification" content = template_for_bind_email(payload) if "email_verification_for_reset" == template_id: sender = "donotreply@visafly.top" subject = "Reset Password" content = template_for_reset_pwd(payload) if "login_credentials" == template_id: sender = "donotreply@visafly.top" subject = "Your Account Details" content = template_for_login_credentials(payload) if "ticket_created" == template_id: sender = "donotreply@visafly.top" subject = "Ticket Created" content = template_ticket_open(payload) if "appointment_confirmation" == template_id: sender = "donotreply@visafly.top" subject = "Appointment Confirmation" content = template_appointment_confirmation(payload) if content: async with session_factory() as db: # type: AsyncSession auth = await EmailAuthorizationService.get_by_email(db, sender) send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content) print(f"Email send result: {send_result}") if "wechat" == channel: api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32" if "payment_user_confirmed" == template_id: status = await WechatService.push_payment_template(api_token, payload) print(f"Wechat send status: {status}") if "slot_snapshot" == template_id: # 1. 提取标识字段 country = payload.get("country", "unknown") city = payload.get("city", "unknown") visa_type = payload.get("visa_type", "unknown") earliest_date = payload.get("earliest_date", "N/A") # 2. 生成 Redis 频率限制 Key # 格式: throttle:slot_snapshot:USA:Beijing:B1 throttle_key = f"throttle:slot_snapshot:{country}:{city}:{visa_type}" # 3. 检查是否存在记录(即是否在冷却期内) last_sent_val = await redis_client.get(throttle_key) # 4. 判断是否需要跳过 # 如果记录存在,且 earliest_date 没有变化,则跳过推送 if last_sent_val and last_sent_val == earliest_date: print(f"⏭️ Skipped redundant Wechat notification for {country}-{city} (In Cooling Period)") continue # 5. 执行发送 status = await WechatService.push_slot_snapshot(api_token, payload) print(f"Wechat send status: {status}") # 6. 发送成功后更新 Redis 记录并设置过期时间 # 存储当前的最早日期,下次如果日期变了,即便没过 30 分钟也会再次推送 await redis_client.set(throttle_key, str(earliest_date), ex=THROTTLE_EXPIRY) print(f"✅ Notification sent: {message.get('notification_id')}") except Exception as e: print(f"⚠️ Notification consumer error: {e}") await asyncio.sleep(1) # 避免异常循环过快 def template_for_bind_email(payload): """ 生成绑定邮箱验证码邮件 Args: payload (dict): 包含以下字段: - app_name: 应用名称 (默认 Visafly) - code: 验证码 - expiration_time: 过期时间描述 (如 "10 minutes") """ # 1. 定义 HTML 模板 template = '''
Hello,
You requested to bind this email address to your {{app_name}} account. Please use the verification code below to proceed:
This code will expire in {{expiration_time}}.
If you did not request this change, please ignore this email.
Best regards,
The {{app_name}} Team
Hello,
We received a request to reset the password for your {{app_name}} account. Please use the following code to verify your identity:
This code is valid for {{expiration_time}}.
Best regards,
The {{app_name}} Team
Dear User,
Your account has been successfully set up. Below are your temporary login credentials.
Or copy this link: {{login_url}}
Hello {{username}},
We wanted to let you know that we've received your request. Our team is currently reviewing the details.
We usually reply within 24 hours. You will receive an email notification when our agent replies.
View Ticket DetailsHello {{username}},
Great news! We have successfully secured your appointment slot.
Did not receive the email?
Please check your Spam folder first. If still missing, contact us:
✉️ Email Support | 🌐 Contact Us