import asyncio import json from datetime import datetime from typing import Dict, Any from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from app.services.wechat_service import WechatService from app.services.email_authorizations_service import EmailAuthorizationService from app.utils.redis_utils import redis_qpop async def notification_consumer(session_factory, redis_client: Redis): """ 异步消费 Redis 队列 vas_notification_queue """ queue_name = "vas_notification_queue" while True: try: # 阻塞获取队列消息 message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5) if not message: await asyncio.sleep(1) # 队列为空,休眠 continue channel = message.get("channel", "") template_id = message.get("template_id") payload = message.get("payload", {}) # 按渠道发送 if "email" == channel: content = None sender = None subject = None receiver = message.get("receiver", "") if "email_verification_for_bind" == template_id: sender = "donotreply@visafly.top" subject = "Email Verification" content = template_for_bind_email(payload) if "email_verification_for_reset" == template_id: sender = "donotreply@visafly.top" subject = "Reset Password" content = template_for_reset_pwd(payload) if "login_credentials" == template_id: sender = "donotreply@visafly.top" subject = "Your Account Details" content = template_for_login_credentials(payload) if "ticket_created" == template_id: sender = "donotreply@visafly.top" subject = "Ticket Created" content = template_ticket_open(payload) if content: async with session_factory() as db: # type: AsyncSession auth = await EmailAuthorizationService.get_by_email(db, sender) send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content) print(f"Email send result: {send_result}") if "wechat" == channel: api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32" if "payment_user_confirmed" == template_id: status = await WechatService.push_payment_template(api_token, payload) print(f"Wechat send status: {status}") print(f"✅ Notification sent: {message.get('notification_id')}") except Exception as e: print(f"⚠️ Notification consumer error: {e}") await asyncio.sleep(1) # 避免异常循环过快 def template_for_bind_email(payload): """ 生成绑定邮箱验证码邮件 Args: payload (dict): 包含以下字段: - app_name: 应用名称 (默认 Visafly) - code: 验证码 - expiration_time: 过期时间描述 (如 "10 minutes") """ # 1. 定义 HTML 模板 template = ''' Email Verification

Verify your email address

Hello,

You requested to bind this email address to your {{app_name}} account. Please use the verification code below to proceed:

{{code}}

This code will expire in {{expiration_time}}.

If you did not request this change, please ignore this email.


Best regards,
The {{app_name}} Team

''' # 2. 执行数据替换 # 使用 .get() 提供默认值,防止缺少字段导致报错 # 使用 str() 确保数据是字符串类型 app_name = str(payload.get('app_name', 'Visafly')) code = str(payload.get('code', '')) expiration_time = str(payload.get('expiration_time', '10 minutes')) # 链式替换所有占位符 html_content = template.replace('{{app_name}}', app_name) \ .replace('{{code}}', code) \ .replace('{{expiration_time}}', expiration_time) return html_content def template_for_reset_pwd(payload): """ 生成重置密码验证码邮件 Args: payload (dict): 包含以下字段: - app_name: 应用名称 (默认 Visafly) - code: 验证码 - expiration_time: 过期时间描述 (如 "10 minutes") """ # 1. 定义 HTML 模板 template = ''' Reset Password

Password Reset Request

Hello,

We received a request to reset the password for your {{app_name}} account. Please use the following code to verify your identity:

{{code}}

This code is valid for {{expiration_time}}.

Security Tip: If you did not request a password reset, please ignore this email. No changes will be made to your account.

Best regards,
The {{app_name}} Team

''' # 2. 执行数据替换 # 使用 .get() 提供默认值,防止缺少字段导致报错 # 使用 str() 确保数据是字符串类型,防止 replace 报错 app_name = str(payload.get('app_name', 'Visafly')) code = str(payload.get('code', '')) expiration_time = str(payload.get('expiration_time', '10 minutes')) html_content = template.replace('{{app_name}}', app_name) \ .replace('{{code}}', code) \ .replace('{{expiration_time}}', expiration_time) return html_content def template_for_login_credentials(payload): """ 生成用户登录凭证邮件 (账号 + 临时密码) Args: payload (dict): 包含以下字段: - app_name: 应用名称 - username: 登录账号 - password: 临时密码 - login_url: 登录链接 """ # 1. 定义 HTML 模板 template = ''' Your Account Details

Welcome to {{app_name}}

Dear User,

Your account has been successfully set up. Below are your temporary login credentials.

Username: {{username}}
Password: {{password}}
Important: For your security, please change your password immediately after logging in.
Log In Now

Or copy this link: {{login_url}}

''' # 2. 执行数据替换 # 使用 payload.get() 提供默认值,防止缺少字段导致报错 app_name = str(payload.get('app_name', 'Visafly')) username = str(payload.get('username', '')) password = str(payload.get('password', '')) login_url = str(payload.get('login_url', '#')) # 链式替换所有占位符 html_content = template.replace('{{app_name}}', app_name) \ .replace('{{username}}', username) \ .replace('{{password}}', password) \ .replace('{{login_url}}', login_url) return html_content def template_ticket_open(payload): """ 生成工单创建通知邮件 payload 需包含: username, ticket_id, ticket_type, created_at, ticket_url, app_name """ # --- 1. 处理时间格式化逻辑 --- raw_time = payload.get('created_at') formatted_time = "" if isinstance(raw_time, datetime): # 如果传入的是 datetime 对象 formatted_time = raw_time.strftime('%Y-%m-%d %H:%M') + " (UTC)" elif isinstance(raw_time, str): try: # 如果传入的是 ISO 字符串 (例如 '2025-12-31T02:33:00Z') # 截取前19位通常能兼容大部分 ISO 格式 dt_obj = datetime.fromisoformat(raw_time.replace('Z', '+00:00')) formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M') + " (UTC)" except ValueError: # 如果解析失败,直接显示原字符串 formatted_time = raw_time else: formatted_time = "N/A" # --- 2. HTML 模板 --- # 注意:这里保持了 {{key}} 占位符,下面会统一替换 template = ''' Ticket Created

Support Request Received

Hello {{username}},

We wanted to let you know that we've received your request. Our team is currently reviewing the details.

Ticket ID: #{{ticket_id}}
Type: {{ticket_type}}
Created Time: {{created_at}}

We usually reply within 24 hours. You will receive an email notification when our agent replies.

View Ticket Details
''' # --- 3. 执行替换 --- # 使用 payload 中的数据替换模板占位符 html_content = template.replace('{{username}}', str(payload.get('username', 'User'))) \ .replace('{{ticket_id}}', str(payload.get('ticket_id', ''))) \ .replace('{{ticket_type}}', str(payload.get('ticket_type', ''))) \ .replace('{{created_at}}', formatted_time) \ .replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \ .replace('{{app_name}}', str(payload.get('app_name', 'Visafly'))) return html_content def _format_date_en(dt_obj): """Format date to '15 Jan 2024' to avoid US/UK format confusion.""" if not dt_obj: return "N/A" if isinstance(dt_obj, str): return dt_obj return dt_obj.strftime("%d %b %Y") def _format_time_en(dt_obj): if not dt_obj: return "" return dt_obj.strftime("%H:%M:%S") def _parse_slots_summary(availability_data): """Convert JSON data to a readable English summary.""" if not availability_data: return "No specific dates data." if isinstance(availability_data, list): # Take first 3 dates only to keep it clean dates = availability_data[:3] text = ", ".join(str(d) for d in dates) if len(availability_data) > 3: text += f" (+{len(availability_data) - 3} more)" return text return str(availability_data) def _get_display_meta(data): """ Return dynamic header logic based on status. Goal: Put the most important info in the notification preview. """ date_str = _format_date_en(data.earliest_date) if data.availability_status == 'Available': # Notification Preview: "🟢 UK London: 15 Jan 2024" emoji = "🟢" # If available, the DATE is the headline headline = f"{data.country}, {data.city}: {date_str}" color = "info" # Green for WeChat elif data.availability_status == 'Waitlist': emoji = "🟡" headline = f"Waitlist: {data.country}, {data.city}" color = "warning" # Orange for WeChat else: emoji = "🔴" headline = f"No Slots: {data.country}, {data.city}" color = "comment" # Grey for WeChat return emoji, headline, color, date_str def generate_wechat_markdown(data) -> dict: emoji, headline, color, date_str = _get_display_meta(data) slots_summary = _parse_slots_summary(data.availability) # WeCom uses Markdown. # Logic: Keep the header distinct based on availability. markdown_content = ( f"# {emoji} {headline}\n" f"> **Visa Type**: {data.visa_type}\n" f"> **Earliest**: {date_str}\n" f"> **Details**: {slots_summary}\n" f"\n" f"👉 [Tap to Book Appointment]({data.website})\n" f"\n" f"Updated: {_format_time_en(data.snapshot_at)}" ) return { "msgtype": "markdown", "markdown": { "content": markdown_content } } def generate_telegram_message(data) -> str: emoji, headline, _, date_str = _get_display_meta(data) slots_summary = _parse_slots_summary(data.availability) # HTML formatting # for headers, for copying/highlighting data msg = ( f"{emoji} {headline}\n" f"──────────────────\n" f"🛂 Visa: {data.visa_type}\n" f"📅 Earliest: {date_str}\n" f"📊 Slots: {slots_summary}\n" f"──────────────────\n" f"🔗 Book Now ➜\n\n" f"🕒 Checked at {_format_time_en(data.snapshot_at)}" ) return msg