import asyncio import json from datetime import datetime from typing import Dict, Any from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from app.services.wechat_service import WechatService from app.services.email_authorizations_service import EmailAuthorizationService from app.utils.redis_utils import redis_qpop THROTTLE_EXPIRY = 1800 async def notification_consumer(session_factory, redis_client: Redis): """ 异步消费 Redis 队列 vas_notification_queue """ queue_name = "vas_notification_queue" while True: try: # 阻塞获取队列消息 message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5) if not message: await asyncio.sleep(1) # 队列为空,休眠 continue channel = message.get("channel", "") template_id = message.get("template_id") payload = message.get("payload", {}) # 按渠道发送 if "email" == channel: content = None sender = None subject = None receiver = message.get("receiver", "") if "email_verification_for_bind" == template_id: sender = "donotreply@visafly.top" subject = "Email Verification" content = template_for_bind_email(payload) if "email_verification_for_reset" == template_id: sender = "donotreply@visafly.top" subject = "Reset Password" content = template_for_reset_pwd(payload) if "login_credentials" == template_id: sender = "donotreply@visafly.top" subject = "Your Account Details" content = template_for_login_credentials(payload) if "ticket_created" == template_id: sender = "donotreply@visafly.top" subject = "Ticket Created" content = template_ticket_open(payload) if "appointment_confirmation" == template_id: sender = "donotreply@visafly.top" subject = "Appointment Confirmation" content = template_appointment_confirmation(payload) if content: async with session_factory() as db: # type: AsyncSession auth = await EmailAuthorizationService.get_by_email(db, sender) send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content) print(f"Email send result: {send_result}") if "wechat" == channel: api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32" if "payment_user_confirmed" == template_id: status = await WechatService.push_payment_template(api_token, payload) print(f"Wechat send status: {status}") if "slot_snapshot" == template_id: # 1. 提取标识字段 country = payload.get("country", "unknown") city = payload.get("city", "unknown") visa_type = payload.get("visa_type", "unknown") earliest_date = payload.get("earliest_date", "N/A") # 2. 生成 Redis 频率限制 Key # 格式: throttle:slot_snapshot:USA:Beijing:B1 throttle_key = f"throttle:slot_snapshot:{country}:{city}:{visa_type}" # 3. 检查是否存在记录(即是否在冷却期内) last_sent_val = await redis_client.get(throttle_key) # 4. 判断是否需要跳过 # 如果记录存在,且 earliest_date 没有变化,则跳过推送 if last_sent_val and last_sent_val == earliest_date: print(f"⏭️ Skipped redundant Wechat notification for {country}-{city} (In Cooling Period)") continue # 5. 执行发送 status = await WechatService.push_slot_snapshot(api_token, payload) print(f"Wechat send status: {status}") # 6. 发送成功后更新 Redis 记录并设置过期时间 # 存储当前的最早日期,下次如果日期变了,即便没过 30 分钟也会再次推送 await redis_client.set(throttle_key, str(earliest_date), ex=THROTTLE_EXPIRY) print(f"✅ Notification sent: {message.get('notification_id')}") except Exception as e: print(f"⚠️ Notification consumer error: {e}") await asyncio.sleep(1) # 避免异常循环过快 def template_for_bind_email(payload): """ 生成绑定邮箱验证码邮件 Args: payload (dict): 包含以下字段: - app_name: 应用名称 (默认 Visafly) - code: 验证码 - expiration_time: 过期时间描述 (如 "10 minutes") """ # 1. 定义 HTML 模板 template = ''' Email Verification

Verify your email address

Hello,

You requested to bind this email address to your {{app_name}} account. Please use the verification code below to proceed:

{{code}}

This code will expire in {{expiration_time}}.

If you did not request this change, please ignore this email.


Best regards,
The {{app_name}} Team

''' # 2. 执行数据替换 # 使用 .get() 提供默认值,防止缺少字段导致报错 # 使用 str() 确保数据是字符串类型 app_name = str(payload.get('app_name', 'Visafly')) code = str(payload.get('code', '')) expiration_time = str(payload.get('expiration_time', '10 minutes')) # 链式替换所有占位符 html_content = template.replace('{{app_name}}', app_name) \ .replace('{{code}}', code) \ .replace('{{expiration_time}}', expiration_time) return html_content def template_for_reset_pwd(payload): """ 生成重置密码验证码邮件 Args: payload (dict): 包含以下字段: - app_name: 应用名称 (默认 Visafly) - code: 验证码 - expiration_time: 过期时间描述 (如 "10 minutes") """ # 1. 定义 HTML 模板 template = ''' Reset Password

Password Reset Request

Hello,

We received a request to reset the password for your {{app_name}} account. Please use the following code to verify your identity:

{{code}}

This code is valid for {{expiration_time}}.

Security Tip: If you did not request a password reset, please ignore this email. No changes will be made to your account.

Best regards,
The {{app_name}} Team

''' # 2. 执行数据替换 # 使用 .get() 提供默认值,防止缺少字段导致报错 # 使用 str() 确保数据是字符串类型,防止 replace 报错 app_name = str(payload.get('app_name', 'Visafly')) code = str(payload.get('code', '')) expiration_time = str(payload.get('expiration_time', '10 minutes')) html_content = template.replace('{{app_name}}', app_name) \ .replace('{{code}}', code) \ .replace('{{expiration_time}}', expiration_time) return html_content def template_for_login_credentials(payload): """ 生成用户登录凭证邮件 (账号 + 临时密码) Args: payload (dict): 包含以下字段: - app_name: 应用名称 - username: 登录账号 - password: 临时密码 - login_url: 登录链接 """ # 1. 定义 HTML 模板 template = ''' Your Account Details

Welcome to {{app_name}}

Dear User,

Your account has been successfully set up. Below are your temporary login credentials.

Username: {{username}}
Password: {{password}}
Important: For your security, please change your password immediately after logging in.
Log In Now

Or copy this link: {{login_url}}

''' # 2. 执行数据替换 # 使用 payload.get() 提供默认值,防止缺少字段导致报错 app_name = str(payload.get('app_name', 'Visafly')) username = str(payload.get('username', '')) password = str(payload.get('password', '')) login_url = str(payload.get('login_url', '#')) # 链式替换所有占位符 html_content = template.replace('{{app_name}}', app_name) \ .replace('{{username}}', username) \ .replace('{{password}}', password) \ .replace('{{login_url}}', login_url) return html_content def template_ticket_open(payload): """ 生成工单创建通知邮件 payload 需包含: username, ticket_id, ticket_type, created_at, ticket_url, app_name """ # --- 1. 处理时间格式化逻辑 --- raw_time = payload.get('created_at') formatted_time = "" if isinstance(raw_time, datetime): # 如果传入的是 datetime 对象 formatted_time = raw_time.strftime('%Y-%m-%d %H:%M') + " (UTC)" elif isinstance(raw_time, str): try: # 如果传入的是 ISO 字符串 (例如 '2025-12-31T02:33:00Z') # 截取前19位通常能兼容大部分 ISO 格式 dt_obj = datetime.fromisoformat(raw_time.replace('Z', '+00:00')) formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M') + " (UTC)" except ValueError: # 如果解析失败,直接显示原字符串 formatted_time = raw_time else: formatted_time = "N/A" # --- 2. HTML 模板 --- # 注意:这里保持了 {{key}} 占位符,下面会统一替换 template = ''' Ticket Created

Support Request Received

Hello {{username}},

We wanted to let you know that we've received your request. Our team is currently reviewing the details.

Ticket ID: #{{ticket_id}}
Type: {{ticket_type}}
Created Time: {{created_at}}

We usually reply within 24 hours. You will receive an email notification when our agent replies.

View Ticket Details
''' # --- 3. 执行替换 --- # 使用 payload 中的数据替换模板占位符 html_content = template.replace('{{username}}', str(payload.get('username', 'User'))) \ .replace('{{ticket_id}}', str(payload.get('ticket_id', ''))) \ .replace('{{ticket_type}}', str(payload.get('ticket_type', ''))) \ .replace('{{created_at}}', formatted_time) \ .replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \ .replace('{{app_name}}', str(payload.get('app_name', 'Visafly'))) return html_content def template_appointment_confirmation(payload): """ 生成预约成功确认邮件 (VisaFly) payload 需包含: - username: 用户名 - order_id: 订单号 (新增) - country: 国家 - city: 城市 - appointment_date: 预约时间 (字符串, 例如 "2026-03-15 09:00") - visa_type: 签证类型 - user_email: 用户邮箱 (用于提示信件已发往此处) """ # --- 1. 基础配置 (VisaFly) --- company_name = "VisaFly" support_email = "support@visafly.top" website_home = "https://visafly.top" website_contact = "https://visafly.top/refund-policy" # --- 2. HTML 模板 --- template = ''' Appointment Confirmed

✅ Booking Successful!

Appointment Secured by {{company_name}}

Hello {{username}},

Great news! We have successfully secured your appointment slot.

Order ID: #{{order_id}}
Country / City: {{country}} - {{city}}
Appointment Date: {{appointment_date}}
Visa Type: {{visa_type}}
📩 Important: Check Your Email We have sent the official confirmation letter to {{user_email}}.

If you don't see it in your Inbox, please check your Spam/Junk folder immediately.

Did not receive the email?

Please check your Spam folder first. If still missing, contact us:

✉️ Email Support | 🌐 Contact Us
''' # --- 3. 执行替换 --- html_content = template.replace('{{username}}', str(payload.get('username', 'Customer'))) \ .replace('{{order_id}}', str(payload.get('order_id', 'N/A'))) \ .replace('{{country}}', str(payload.get('country', ''))) \ .replace('{{city}}', str(payload.get('city', ''))) \ .replace('{{appointment_date}}', str(payload.get('appointment_date', 'Confirmed'))) \ .replace('{{user_email}}', str(payload.get('user_email', 'your email'))) \ .replace('{{visa_type}}', str(payload.get('visa_type', 'Standard'))) \ .replace('{{company_name}}', company_name) \ .replace('{{support_email}}', support_email) \ .replace('{{website_contact}}', website_contact) \ .replace('{{website_home}}', website_home) return html_content