import asyncio
import json
from datetime import datetime
from typing import Dict, Any
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.wechat_service import WechatService
from app.services.email_authorizations_service import EmailAuthorizationService
from app.utils.redis_utils import redis_qpop
async def notification_consumer(session_factory, redis_client: Redis):
"""
异步消费 Redis 队列 vas_notification_queue
"""
queue_name = "vas_notification_queue"
while True:
try:
# 阻塞获取队列消息
message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5)
if not message:
await asyncio.sleep(1) # 队列为空,休眠
continue
channel = message.get("channel", "")
template_id = message.get("template_id")
payload = message.get("payload", {})
# 按渠道发送
if "email" == channel:
content = None
sender = None
subject = None
receiver = message.get("receiver", "")
if "email_verification_for_bind" == template_id:
sender = "donotreply@visafly.top"
subject = "Email Verification"
content = template_for_bind_email(payload)
if "email_verification_for_reset" == template_id:
sender = "donotreply@visafly.top"
subject = "Reset Password"
content = template_for_reset_pwd(payload)
if "login_credentials" == template_id:
sender = "donotreply@visafly.top"
subject = "Your Account Details"
content = template_for_login_credentials(payload)
if "ticket_created" == template_id:
sender = "donotreply@visafly.top"
subject = "Ticket Created"
content = template_ticket_open(payload)
if content:
async with session_factory() as db: # type: AsyncSession
auth = await EmailAuthorizationService.get_by_email(db, sender)
send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content)
print(f"Email send result: {send_result}")
if "wechat" == channel:
api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
if "payment_user_confirmed" == template_id:
status = await WechatService.push_payment_template(api_token, payload)
print(f"Wechat send status: {status}")
print(f"✅ Notification sent: {message.get('notification_id')}")
except Exception as e:
print(f"⚠️ Notification consumer error: {e}")
await asyncio.sleep(1) # 避免异常循环过快
def template_for_bind_email(payload):
"""
生成绑定邮箱验证码邮件
Args:
payload (dict): 包含以下字段:
- app_name: 应用名称 (默认 Visafly)
- code: 验证码
- expiration_time: 过期时间描述 (如 "10 minutes")
"""
# 1. 定义 HTML 模板
template = '''
Email Verification
Verify your email address
Hello,
You requested to bind this email address to your {{app_name}} account. Please use the verification code below to proceed:
{{code}}
This code will expire in {{expiration_time}}.
If you did not request this change, please ignore this email.
Best regards,
The {{app_name}} Team
'''
# 2. 执行数据替换
# 使用 .get() 提供默认值,防止缺少字段导致报错
# 使用 str() 确保数据是字符串类型
app_name = str(payload.get('app_name', 'Visafly'))
code = str(payload.get('code', ''))
expiration_time = str(payload.get('expiration_time', '10 minutes'))
# 链式替换所有占位符
html_content = template.replace('{{app_name}}', app_name) \
.replace('{{code}}', code) \
.replace('{{expiration_time}}', expiration_time)
return html_content
def template_for_reset_pwd(payload):
"""
生成重置密码验证码邮件
Args:
payload (dict): 包含以下字段:
- app_name: 应用名称 (默认 Visafly)
- code: 验证码
- expiration_time: 过期时间描述 (如 "10 minutes")
"""
# 1. 定义 HTML 模板
template = '''
Reset Password
Hello,
We received a request to reset the password for your {{app_name}} account. Please use the following code to verify your identity:
{{code}}
This code is valid for {{expiration_time}}.
Security Tip: If you did not request a password reset, please ignore this email. No changes will be made to your account.
Best regards,
The {{app_name}} Team
'''
# 2. 执行数据替换
# 使用 .get() 提供默认值,防止缺少字段导致报错
# 使用 str() 确保数据是字符串类型,防止 replace 报错
app_name = str(payload.get('app_name', 'Visafly'))
code = str(payload.get('code', ''))
expiration_time = str(payload.get('expiration_time', '10 minutes'))
html_content = template.replace('{{app_name}}', app_name) \
.replace('{{code}}', code) \
.replace('{{expiration_time}}', expiration_time)
return html_content
def template_for_login_credentials(payload):
"""
生成用户登录凭证邮件 (账号 + 临时密码)
Args:
payload (dict): 包含以下字段:
- app_name: 应用名称
- username: 登录账号
- password: 临时密码
- login_url: 登录链接
"""
# 1. 定义 HTML 模板
template = '''
Your Account Details
Dear User,
Your account has been successfully set up. Below are your temporary login credentials.
Username:
{{username}}
Password:
{{password}}
Important: For your security, please change your password immediately after logging in.
Log In Now
Or copy this link: {{login_url}}
'''
# 2. 执行数据替换
# 使用 payload.get() 提供默认值,防止缺少字段导致报错
app_name = str(payload.get('app_name', 'Visafly'))
username = str(payload.get('username', ''))
password = str(payload.get('password', ''))
login_url = str(payload.get('login_url', '#'))
# 链式替换所有占位符
html_content = template.replace('{{app_name}}', app_name) \
.replace('{{username}}', username) \
.replace('{{password}}', password) \
.replace('{{login_url}}', login_url)
return html_content
def template_ticket_open(payload):
"""
生成工单创建通知邮件
payload 需包含: username, ticket_id, ticket_type, created_at, ticket_url, app_name
"""
# --- 1. 处理时间格式化逻辑 ---
raw_time = payload.get('created_at')
formatted_time = ""
if isinstance(raw_time, datetime):
# 如果传入的是 datetime 对象
formatted_time = raw_time.strftime('%Y-%m-%d %H:%M') + " (UTC)"
elif isinstance(raw_time, str):
try:
# 如果传入的是 ISO 字符串 (例如 '2025-12-31T02:33:00Z')
# 截取前19位通常能兼容大部分 ISO 格式
dt_obj = datetime.fromisoformat(raw_time.replace('Z', '+00:00'))
formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M') + " (UTC)"
except ValueError:
# 如果解析失败,直接显示原字符串
formatted_time = raw_time
else:
formatted_time = "N/A"
# --- 2. HTML 模板 ---
# 注意:这里保持了 {{key}} 占位符,下面会统一替换
template = '''
Ticket Created
Hello {{username}},
We wanted to let you know that we've received your request. Our team is currently reviewing the details.
Ticket ID:
#{{ticket_id}}
Type:
{{ticket_type}}
Created Time:
{{created_at}}
We usually reply within 24 hours. You will receive an email notification when our agent replies.
View Ticket Details
'''
# --- 3. 执行替换 ---
# 使用 payload 中的数据替换模板占位符
html_content = template.replace('{{username}}', str(payload.get('username', 'User'))) \
.replace('{{ticket_id}}', str(payload.get('ticket_id', ''))) \
.replace('{{ticket_type}}', str(payload.get('ticket_type', ''))) \
.replace('{{created_at}}', formatted_time) \
.replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \
.replace('{{app_name}}', str(payload.get('app_name', 'Visafly')))
return html_content
def _format_date_en(dt_obj):
"""Format date to '15 Jan 2024' to avoid US/UK format confusion."""
if not dt_obj:
return "N/A"
if isinstance(dt_obj, str):
return dt_obj
return dt_obj.strftime("%d %b %Y")
def _format_time_en(dt_obj):
if not dt_obj:
return ""
return dt_obj.strftime("%H:%M:%S")
def _parse_slots_summary(availability_data):
"""Convert JSON data to a readable English summary."""
if not availability_data:
return "No specific dates data."
if isinstance(availability_data, list):
# Take first 3 dates only to keep it clean
dates = availability_data[:3]
text = ", ".join(str(d) for d in dates)
if len(availability_data) > 3:
text += f" (+{len(availability_data) - 3} more)"
return text
return str(availability_data)
def _get_display_meta(data):
"""
Return dynamic header logic based on status.
Goal: Put the most important info in the notification preview.
"""
date_str = _format_date_en(data.earliest_date)
if data.availability_status == 'Available':
# Notification Preview: "🟢 UK London: 15 Jan 2024"
emoji = "🟢"
# If available, the DATE is the headline
headline = f"{data.country}, {data.city}: {date_str}"
color = "info" # Green for WeChat
elif data.availability_status == 'Waitlist':
emoji = "🟡"
headline = f"Waitlist: {data.country}, {data.city}"
color = "warning" # Orange for WeChat
else:
emoji = "🔴"
headline = f"No Slots: {data.country}, {data.city}"
color = "comment" # Grey for WeChat
return emoji, headline, color, date_str
def generate_wechat_markdown(data) -> dict:
emoji, headline, color, date_str = _get_display_meta(data)
slots_summary = _parse_slots_summary(data.availability)
# WeCom uses Markdown.
# Logic: Keep the header distinct based on availability.
markdown_content = (
f"# {emoji} {headline}\n"
f"> **Visa Type**: {data.visa_type}\n"
f"> **Earliest**: {date_str}\n"
f"> **Details**: {slots_summary}\n"
f"\n"
f"👉 [Tap to Book Appointment]({data.website})\n"
f"\n"
f"Updated: {_format_time_en(data.snapshot_at)}"
)
return {
"msgtype": "markdown",
"markdown": {
"content": markdown_content
}
}
def generate_telegram_message(data) -> str:
emoji, headline, _, date_str = _get_display_meta(data)
slots_summary = _parse_slots_summary(data.availability)
# HTML formatting
# for headers, for copying/highlighting data
msg = (
f"{emoji} {headline}\n"
f"──────────────────\n"
f"🛂 Visa: {data.visa_type}\n"
f"📅 Earliest: {date_str}\n"
f"📊 Slots: {slots_summary}\n"
f"──────────────────\n"
f"🔗 Book Now ➜\n\n"
f"🕒 Checked at {_format_time_en(data.snapshot_at)}"
)
return msg