jerry 2 місяців тому
батько
коміт
2b95b85146

+ 87 - 9
app/api/router.py

@@ -6,10 +6,9 @@ import stripe
 from typing import List, Dict, Any, Optional
 from typing import List, Dict, Any, Optional
 from app.core.logger import logger
 from app.core.logger import logger
 from app.core.config import settings
 from app.core.config import settings
-from fastapi import APIRouter, Request, Response, Query, Depends, Body, UploadFile, File, HTTPException
+from fastapi import APIRouter, Request, Response, Query, Depends, Body, UploadFile, File
 from fastapi.responses import RedirectResponse
 from fastapi.responses import RedirectResponse
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy.ext.asyncio import AsyncSession
-from app.utils.redis_utils import redis_qpush
 from app.utils.validation_utils import validate_user_inputs
 from app.utils.validation_utils import validate_user_inputs
 from app.core.redis import get_redis_client
 from app.core.redis import get_redis_client
 from app.core.database import get_db
 from app.core.database import get_db
@@ -27,6 +26,7 @@ from app.schemas.troov import TroovRate, TroovCheckForbiddenInput, TroovProb
 from app.schemas.sms import ShortMessageDetail
 from app.schemas.sms import ShortMessageDetail
 from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate, ConfigurationOut
 from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate, ConfigurationOut
 from app.schemas.email_authorizations import EmailContent, EmailAuthorizationCreate, EmailAuthorizationUpdate, EmailAuthorizationOut
 from app.schemas.email_authorizations import EmailContent, EmailAuthorizationCreate, EmailAuthorizationUpdate, EmailAuthorizationOut
+from app.schemas.emails import VasEmailCreate, VasEmailOut
 from app.schemas.card import CardCreate, CardOut
 from app.schemas.card import CardCreate, CardOut
 from app.schemas.task import TaskCreate, TaskOut, TaskUpdate
 from app.schemas.task import TaskCreate, TaskOut, TaskUpdate
 from app.schemas.short_url import ShortUrlCreate, ShortUrlOut
 from app.schemas.short_url import ShortUrlCreate, ShortUrlOut
@@ -48,6 +48,8 @@ from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut, SlotO
 from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
 from app.schemas.slot_refresh_status import RefreshBase, RefreshFail, RefreshStatusOut
 from app.schemas.telegram import TelegramIn
 from app.schemas.telegram import TelegramIn
 from app.schemas.wechat import WechatIn
 from app.schemas.wechat import WechatIn
+from app.schemas.whatsapp import WhatsappIn
+from app.schemas.notification_outbox import NotificationOutboxCreate, NotificationOutboxUpdate, NotificationOutboxOut
 from app.schemas.resource import FileUploadOut
 from app.schemas.resource import FileUploadOut
 from app.schemas.statistics import VasStatisticsOverviewOut
 from app.schemas.statistics import VasStatisticsOverviewOut
 from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
 from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
@@ -60,6 +62,7 @@ from app.services.troov_service import TroovService
 from app.services.visametric_service import VisametricService
 from app.services.visametric_service import VisametricService
 from app.services.sms_service import save_short_message, query_short_message
 from app.services.sms_service import save_short_message, query_short_message
 from app.services.email_authorizations_service import EmailAuthorizationService
 from app.services.email_authorizations_service import EmailAuthorizationService
+from app.services.emails_service import EmailsService
 from app.services.short_url_service import ShortUrlService
 from app.services.short_url_service import ShortUrlService
 from app.services.task_service import TaskService
 from app.services.task_service import TaskService
 from app.services.card_service import CardService
 from app.services.card_service import CardService
@@ -81,6 +84,8 @@ from app.services.notification_service import NotificationService
 from app.services.ticket_service import TicketService
 from app.services.ticket_service import TicketService
 from app.services.telegram_service import TelegramService
 from app.services.telegram_service import TelegramService
 from app.services.wechat_service import WechatService
 from app.services.wechat_service import WechatService
+from app.services.whatsapp_service import WhatsappService
+from app.services.notification_outbox_service import NotificationOutboxService
 from app.services.slot_snapshot_service import SlotSnapshotService
 from app.services.slot_snapshot_service import SlotSnapshotService
 from app.services.statistics_service import StatisticsService
 from app.services.statistics_service import StatisticsService
 from app.services.llm_service import LlmService
 from app.services.llm_service import LlmService
@@ -579,6 +584,21 @@ async def email_authorizations_send_email_bulk(
     )
     )
     return success(data={"body": result})
     return success(data={"body": result})
 
 
+@admin_required_router.post("/emails/write", summary="写入邮件", tags=["邮箱接口"], response_model=ApiResponse[VasEmailOut])
+async def emails_write(
+    payload: VasEmailCreate,
+    db: AsyncSession = Depends(get_db)
+):
+    rec = await EmailsService.create(db, payload)
+    return success(data=rec)
+
+@admin_required_router.get("/emails/max_uid", summary="获取最大邮件UID", tags=["邮箱接口"], response_model=ApiResponse[int])
+async def emails_get_max_uid(
+    db: AsyncSession = Depends(get_db)
+):
+    uid = await EmailsService.get_max_uid(db)
+    return success(data=uid)
+
 @admin_required_router.get("/account/list_all", summary="分页查询账号", tags=["账号管理"], response_model=ApiResponse[PageResponse[AccountResponse]])
 @admin_required_router.get("/account/list_all", summary="分页查询账号", tags=["账号管理"], response_model=ApiResponse[PageResponse[AccountResponse]])
 async def account_next(
 async def account_next(
     page: int = Query(0, description="第几页"),
     page: int = Query(0, description="第几页"),
@@ -689,16 +709,76 @@ async def task_pop_task(
 async def tg_send_message(
 async def tg_send_message(
     payload: TelegramIn
     payload: TelegramIn
 ):
 ):
-    await TelegramService.push_to_telegram(payload)
+    await TelegramService.push_text(payload)
     return success()
     return success()
 
 
-@admin_required_router.post("/wechat/send", summary="推送微信消息", tags=["消息推送接口"], response_model=ApiResponse)
+@admin_required_router.post("/wechat/send", summary="推送微信文本消息", tags=["消息推送接口"], response_model=ApiResponse)
 async def wechat_send(
 async def wechat_send(
     payload: WechatIn
     payload: WechatIn
 ):
 ):
-    await WechatService.push_to_wechat(payload)
+    await WechatService.push_text(payload.api_token, payload.message)
+    return success()
+
+@admin_required_router.post("/wechat/send_markdown", summary="推送微信Markdown消息", tags=["消息推送接口"], response_model=ApiResponse)
+async def wechat_send_markdown(
+    payload: WechatIn
+):
+    await WechatService.push_markdown(payload.api_token, payload.message)
     return success()
     return success()
 
 
+@admin_required_router.post("/whatsapp/send", summary="推送WhatsApp消息", tags=["消息推送接口"], response_model=ApiResponse)
+async def whatsapp_send(
+    payload: WhatsappIn
+):
+    api_base_url = payload.api_base_url or "https://waha.visafly.top"
+    await WhatsappService.send_text(
+        api_base_url=api_base_url,
+        session=payload.session,
+        chat_id=payload.chat_id,
+        message=payload.message,
+        api_key=payload.api_key,
+    )
+    return success()
+
+
+@admin_required_router.post("/notification/outbox/create", summary="创建通知消息", tags=["消息推送接口"], response_model=ApiResponse[NotificationOutboxOut])
+async def notification_outbox_create(
+    payload: NotificationOutboxCreate,
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await NotificationOutboxService.create(db, payload)
+    return success(data=obj)
+
+@admin_required_router.post("/notification/outbox/update", summary="更新通知消息", tags=["消息推送接口"], response_model=ApiResponse[NotificationOutboxOut])
+async def notification_outbox_update(
+    id: int,
+    payload: NotificationOutboxUpdate,
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await NotificationOutboxService.update(db, id, payload)
+    return success(data=obj)
+
+@admin_required_router.get("/notification/outbox/detail", summary="获取通知详情", tags=["消息推送接口"], response_model=ApiResponse[NotificationOutboxOut])
+async def notification_outbox_detail(
+    id: int,
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await NotificationOutboxService.get(db, id)
+    return success(data=obj)
+
+@admin_required_router.get("/notification/outbox/list", summary="分页查询通知列表", tags=["消息推送接口"], response_model=ApiResponse[PageResponse[NotificationOutboxOut]])
+async def notification_outbox_list(
+    status: str = Query("", description="状态"),
+    channel: str = Query("", description="渠道"),
+    priority: int = Query(0, description="优先级"),
+    msg_id: str = Query("", description="消息ID"),
+    page: int = Query(1, description="第几页"),
+    size: int = Query(20, description="分页大小"),
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await NotificationOutboxService.list(db, status, channel, priority, msg_id, page, size)
+    return success(data=obj)
+
 @admin_required_router.post("/cards/publish", summary="创建新的消息卡片", tags=["信息卡片接口"], response_model=ApiResponse[CardOut])
 @admin_required_router.post("/cards/publish", summary="创建新的消息卡片", tags=["信息卡片接口"], response_model=ApiResponse[CardOut])
 async def cards_publish(
 async def cards_publish(
     data: CardCreate = Body(...),
     data: CardCreate = Body(...),
@@ -1246,10 +1326,9 @@ async def vas_payment_admin_update_status(
 async def vas_payment_confirm_by_user(
 async def vas_payment_confirm_by_user(
     payload: VasPaymentConfirmationCreate,
     payload: VasPaymentConfirmationCreate,
     current_user: VasUser = Depends(get_current_user),
     current_user: VasUser = Depends(get_current_user),
-    db: AsyncSession = Depends(get_db),
-    redis_client: Redis = Depends(get_redis_client)
+    db: AsyncSession = Depends(get_db)
 ):
 ):
-    res = await PaymentService.confirm_by_user(db, payload, current_user, redis_client)
+    res = await PaymentService.confirm_by_user(db, payload, current_user)
     return success(data=res)
     return success(data=res)
 
 
 @admin_required_router.post("/vas/payment_confirmation/list_all", summary="查询所有待确认支付", tags=["Visafly签证系统"], response_model=ApiResponse[PageResponse[VasPaymentConfirmationOut]])
 @admin_required_router.post("/vas/payment_confirmation/list_all", summary="查询所有待确认支付", tags=["Visafly签证系统"], response_model=ApiResponse[PageResponse[VasPaymentConfirmationOut]])
@@ -1442,4 +1521,3 @@ async def get_ticket_messages(
         size=size
         size=size
     )
     )
     return success(data=msgs)
     return success(data=msgs)
-

+ 1 - 6
app/main.py

@@ -1,4 +1,3 @@
-import asyncio
 import os
 import os
 from fastapi import FastAPI, Depends, Request
 from fastapi import FastAPI, Depends, Request
 from fastapi.responses import JSONResponse
 from fastapi.responses import JSONResponse
@@ -8,14 +7,12 @@ from fastapi.exceptions import RequestValidationError
 from starlette.exceptions import HTTPException as StarletteHTTPException
 from starlette.exceptions import HTTPException as StarletteHTTPException
 
 
 from app.api import router
 from app.api import router
-from app.core.redis import get_redis_client
 from app.core.database import AsyncSessionLocal
 from app.core.database import AsyncSessionLocal
 from app.core.auth import RoleLevel, require_min_role
 from app.core.auth import RoleLevel, require_min_role
 from app.core.config import settings
 from app.core.config import settings
 from app.core.payment import init_stripe
 from app.core.payment import init_stripe
 from app.core.biz_exception import BizException
 from app.core.biz_exception import BizException
 from app.core.logger import logger
 from app.core.logger import logger
-from app.tasks.notification_task import notification_consumer
 
 
 
 
 app = FastAPI(title=settings.app_name)
 app = FastAPI(title=settings.app_name)
@@ -30,9 +27,7 @@ async def startup():
     logger.info("🟢 Stripe config done")
     logger.info("🟢 Stripe config done")
     
     
 
 
-    redis_client = await get_redis_client()
-    asyncio.create_task(notification_consumer(AsyncSessionLocal, redis_client))
-    logger.info("🟢 Notification consumer started")    
+    # Notification processing is handled by an external daemon.
 
 
 # -----------------------
 # -----------------------
 # Exception Handlers
 # Exception Handlers

+ 2 - 0
app/models/__init__.py

@@ -6,3 +6,5 @@ from .payment import VasPayment
 from .card import Card
 from .card import Card
 from .configuration import Configuration
 from .configuration import Configuration
 from .remote_server import RemoteServer
 from .remote_server import RemoteServer
+from .notification_outbox import NotificationOutbox
+from .emails import VasEmail

+ 1 - 3
app/models/emails.py

@@ -1,8 +1,6 @@
 from sqlalchemy import Column, BigInteger, String, Text, TIMESTAMP
 from sqlalchemy import Column, BigInteger, String, Text, TIMESTAMP
 from sqlalchemy.sql import func
 from sqlalchemy.sql import func
-from sqlalchemy.ext.declarative import declarative_base
-
-Base = declarative_base()
+from app.core.database import Base
 
 
 
 
 class VasEmail(Base):
 class VasEmail(Base):

+ 19 - 0
app/models/notification_outbox.py

@@ -0,0 +1,19 @@
+from datetime import datetime
+from sqlalchemy import Column, Integer, String, DateTime, JSON
+from app.core.database import Base
+
+
+class NotificationOutbox(Base):
+    __tablename__ = "notification_outbox"
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+    msg_id = Column(String(64), nullable=False, unique=True)
+    channel = Column(String(32), nullable=False)
+    payload = Column(JSON, nullable=False)
+    priority = Column(Integer, default=10)
+    status = Column(String(16), default="pending")
+    attempts = Column(Integer, default=0)
+    next_retry_at = Column(DateTime, nullable=True)
+
+    created_at = Column(DateTime, default=datetime.utcnow)
+    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

+ 24 - 0
app/schemas/emails.py

@@ -0,0 +1,24 @@
+from pydantic import BaseModel
+from typing import Optional
+from datetime import datetime
+
+
+class VasEmailBase(BaseModel):
+    uid: int
+    subject: Optional[str] = None
+    sender: Optional[str] = None
+    recipient: Optional[str] = None
+    receive_time: Optional[str] = None
+    body_text: Optional[str] = None
+
+
+class VasEmailCreate(VasEmailBase):
+    pass
+
+
+class VasEmailOut(VasEmailBase):
+    created_at: Optional[datetime] = None
+
+    model_config = {
+        "from_attributes": True
+    }

+ 60 - 0
app/schemas/notification_outbox.py

@@ -0,0 +1,60 @@
+import json
+from datetime import datetime
+from typing import Optional, Any, Dict
+from pydantic import BaseModel, field_validator
+
+
+class NotificationOutboxBase(BaseModel):
+    msg_id: Optional[str] = None
+    channel: Optional[str] = None
+    payload: Optional[Dict[str, Any]] = None
+    priority: Optional[int] = 10
+    status: Optional[str] = "pending"
+    attempts: Optional[int] = 0
+    next_retry_at: Optional[datetime] = None
+
+    @field_validator("payload", mode="before")
+    def normalize_json_field(cls, v):
+        if v is None:
+            return None
+        if isinstance(v, str):
+            try:
+                return json.loads(v)
+            except Exception:
+                return {}
+        return v
+
+
+class NotificationOutboxCreate(NotificationOutboxBase):
+    channel: str
+    payload: Dict[str, Any]
+
+
+class NotificationOutboxUpdate(BaseModel):
+    channel: Optional[str] = None
+    payload: Optional[Dict[str, Any]] = None
+    priority: Optional[int] = None
+    status: Optional[str] = None
+    attempts: Optional[int] = None
+    next_retry_at: Optional[datetime] = None
+
+    @field_validator("payload", mode="before")
+    def normalize_json_field(cls, v):
+        if v is None:
+            return None
+        if isinstance(v, str):
+            try:
+                return json.loads(v)
+            except Exception:
+                return {}
+        return v
+
+
+class NotificationOutboxOut(NotificationOutboxBase):
+    id: int
+    created_at: datetime
+    updated_at: datetime
+
+    model_config = {
+        "from_attributes": True
+    }

+ 0 - 1
app/schemas/wechat.py

@@ -6,4 +6,3 @@ from datetime import datetime
 class WechatIn(BaseModel):
 class WechatIn(BaseModel):
     api_token: str
     api_token: str
     message: str
     message: str
-    image: str    

+ 10 - 0
app/schemas/whatsapp.py

@@ -0,0 +1,10 @@
+from pydantic import BaseModel
+from typing import Optional
+
+
+class WhatsappIn(BaseModel):
+    session: str = "default"
+    chat_id: str
+    message: str
+    api_base_url: Optional[str] = None
+    api_key: Optional[str] = None

+ 32 - 23
app/services/auth_service.py

@@ -113,15 +113,18 @@ class AuthService:
         db.add(record)
         db.add(record)
         await db.commit()
         await db.commit()
 
 
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=payload.email,
-            template_id="email_verification_for_bind",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "app_name": "Visafly",
-                "code": token,
-                "expiration_time":  "10 minutes"
-            }
+                "template_id": "email_verification_for_bind",
+                "receiver": payload.email,
+                "payload": {
+                    "app_name": "Visafly",
+                    "code": token,
+                    "expiration_time":  "10 minutes"
+                },
+            },
         )
         )
 
 
     # =========================
     # =========================
@@ -151,14 +154,17 @@ class AuthService:
         db.add(record)
         db.add(record)
         await db.commit()
         await db.commit()
 
 
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=payload.email,
-            template_id="email_verification_for_reset",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "app_name": "Visafly",
-                "code": token,
-                "expiration_time": "10 minutes"
+                "template_id": "email_verification_for_reset",
+                "receiver": payload.email,
+                "payload": {
+                    "app_name": "Visafly",
+                    "code": token,
+                    "expiration_time": "10 minutes"
+                },
             },
             },
         )
         )
 
 
@@ -220,15 +226,18 @@ class AuthService:
         await db.commit()
         await db.commit()
         await db.refresh(user)
         await db.refresh(user)
         
         
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=payload.email,
-            template_id="login_credentials",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "app_name": "Visafly",
-                "username": payload.email,
-                "password": plain_pwd,
-                "login_url": "http://45.137.220.138:3000/login"
+                "template_id": "login_credentials",
+                "receiver": payload.email,
+                "payload": {
+                    "app_name": "Visafly",
+                    "username": payload.email,
+                    "password": plain_pwd,
+                    "login_url": "https://visafly.top/login"
+                },
             },
             },
         )
         )
 
 

+ 182 - 0
app/services/emails_service.py

@@ -0,0 +1,182 @@
+import re
+import asyncio
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, func
+from app.core.logger import logger
+from app.schemas.order_event import VasOrderEventCreate
+from app.services.order_event_service import OrderEventService
+from app.services.notification_service import NotificationService
+from app.models.order import VasOrder
+from app.models.user import VasUser
+from app.models.emails import VasEmail
+from app.schemas.emails import VasEmailCreate
+
+class EmailsService:
+    # 关键词黑名单:匹配到这些词的邮件不会创建订单事件,也不会通知用户
+    SUBJECT_BLACKLIST = [
+        "验证码",
+        "动态码",
+        "verification code",
+        "otp", 
+        "one-time password",
+        "security code",
+        "dynamic code",
+        "one time password"
+    ]
+
+    @staticmethod
+    def _is_subject_blacklisted(subject: str) -> bool:
+        if not subject: return False
+        lower = subject.lower()
+        return any(kw.lower() in lower for kw in EmailsService.SUBJECT_BLACKLIST)
+
+    @staticmethod
+    def _summarize(text: str, limit: int = 120) -> str:
+        if not text: return ""
+        # 将多个空白字符替换为一个空格
+        clean = re.sub(r"\s+", " ", text).strip()
+        return clean[:limit] + ("..." if len(clean) > limit else "")
+
+    @staticmethod
+    async def create(db: AsyncSession, data: VasEmailCreate) -> VasEmail:
+        # 1. 检查 UID 是否已存在 (幂等性)
+        stmt = select(VasEmail).where(VasEmail.uid == data.uid)
+        existing = (await db.execute(stmt)).scalar_one_or_none()
+        if existing:
+            return existing
+
+        # 2. 存入邮件记录
+        rec = VasEmail(
+            uid=data.uid,
+            subject=data.subject,
+            sender=data.sender,
+            recipient=data.recipient,
+            receive_time=data.receive_time,
+            body_text=data.body_text,
+        )
+        db.add(rec)
+        
+        # 3. 拦截、匹配订单并入库通知 (此时还在同一个事务中)
+        await EmailsService._intercept_and_notify(db, rec)
+        
+        # 4. 统一提交
+        await db.commit()
+        await db.refresh(rec)
+        
+        return rec
+
+    @staticmethod
+    async def get_max_uid(db: AsyncSession) -> int:
+        stmt = select(func.max(VasEmail.uid))
+        return await db.scalar(stmt) or 0
+
+    @staticmethod
+    async def _intercept_and_notify(db: AsyncSession, rec: VasEmail) -> None:
+        """
+        拦截处理逻辑:
+        1. 黑名单过滤
+        2. 创建订单事件 (OrderEventService)
+        3. 获取订单关联的用户联系方式
+        4. post_message 到通知表
+        """
+        if EmailsService._is_subject_blacklisted(rec.subject or ""):
+            logger.info(f"Email {rec.uid} skipped: subject '{rec.subject}' is in blacklist.")
+            return
+
+        # 1. 尝试创建订单事件 (OrderEventService 内部会根据 rec.recipient 匹配订单)
+        try:
+            event = await OrderEventService.create(
+                db,
+                VasOrderEventCreate(
+                    order_no=None, # 由 OrderEventService 内部填充
+                    alias_email=rec.recipient,
+                    event_title=rec.subject or "Email Update",
+                    event_message=rec.body_text,
+                    email_uid=rec.uid,
+                    event_time=None,
+                ),
+            )
+        except Exception as e:
+            logger.warning(f"Failed to create order event for email {rec.uid}: {e}")
+            return
+
+        # 2. 如果邮件没能关联到任何活跃订单,则不通知
+        if not event or not event.order_no:
+            logger.info(f"Email {rec.uid} logged, but no matching active order found for {rec.recipient}.")
+            return
+
+        # 3. 获取订单及用户信息
+        stmt = select(VasOrder).where(VasOrder.id == event.order_no)
+        order = (await db.execute(stmt)).scalar_one_or_none()
+        if not order:
+            return
+
+        user = await db.get(VasUser, order.user_id)
+        user_inputs = order.user_inputs or {}
+
+        # 优先级:数据库用户字段 > 订单输入的联系方式
+        email_to = (user.email if user else None) or user_inputs.get("email")
+        phone = (user.phone if user else None) or user_inputs.get("phone")
+        country_code = str(user_inputs.get("phone_country_code") or "")
+
+        # 4. 构造通知内容
+        summary = EmailsService._summarize(rec.body_text or "")
+        
+        # --- 邮件转发任务 ---
+        if email_to:
+            subject = f"Update: {rec.subject or 'Visa Appointment Progress'}"
+            await NotificationService.post_message(
+                db=db,
+                channel="email",
+                payload={
+                    "template_id": "order_event_update",
+                    "receiver": email_to,
+                    "payload": {
+                        "subject": subject,
+                        "summary": summary,
+                        "order_no": order.id,
+                    },
+                },
+            )
+
+        # --- WhatsApp 推送任务 ---
+        if phone:
+            # 确保国家代码不带 +
+            code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
+            # 去掉手机号开头的 0
+            clean_phone = str(phone).lstrip('0')
+            
+            # 组合纯数字部分
+            digits = f"{code}{clean_phone}"
+            # 再次利用正则确保只剩数字(以防 code 包含特殊字符)
+            digits = re.sub(r"\D", "", digits)
+
+            if "@lid" in str(phone):
+                chat_id = phone
+            elif digits:
+                # 使用不带 + 的纯数字拼接后缀
+                chat_id = f"{digits}@c.us"
+            else:
+                return
+
+            message = (
+                f"🔔 *Progress Update*\n\n"
+                f"We received an update for your appointment:\n"
+                f"_{summary}_\n\n"
+                f"Please log in to your dashboard or check your email for full details."
+            )
+
+            await NotificationService.post_message(
+                db=db,
+                channel="whatsapp",
+                payload={
+                    "template_id": "order_event_update",
+                    "receiver": chat_id,
+                    "payload": {
+                        "message": message,
+                        "order_no": order.id,
+                    },
+                },
+            )
+            
+        logger.info(f"Notification tasks queued for Email {rec.uid} (Order: {order.id})")

+ 87 - 0
app/services/notification_outbox_service.py

@@ -0,0 +1,87 @@
+# app/services/notification_outbox_service.py
+
+import uuid
+from typing import Optional
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+
+from app.core.biz_exception import NotFoundError
+from app.models.notification_outbox import NotificationOutbox
+from app.schemas.notification_outbox import NotificationOutboxCreate, NotificationOutboxUpdate
+from app.utils.pagination import paginate
+
+
+class NotificationOutboxService:
+
+    @staticmethod
+    async def create(
+        db: AsyncSession,
+        data: NotificationOutboxCreate,
+    ) -> NotificationOutbox:
+        msg_id = data.msg_id or f"mid_{uuid.uuid4().hex}"
+        db_obj = NotificationOutbox(
+            msg_id=msg_id,
+            channel=data.channel,
+            payload=data.payload,
+            priority=data.priority if data.priority is not None else 10,
+            status=data.status or "pending",
+            attempts=data.attempts or 0,
+            next_retry_at=data.next_retry_at,
+        )
+        db.add(db_obj)
+        await db.commit()
+        await db.refresh(db_obj)
+        return db_obj
+
+    @staticmethod
+    async def update(
+        db: AsyncSession,
+        id: int,
+        data: NotificationOutboxUpdate,
+    ) -> NotificationOutbox:
+        stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
+        db_obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not db_obj:
+            raise NotFoundError("Notification outbox not exist")
+
+        for k, v in data.dict(exclude_unset=True).items():
+            setattr(db_obj, k, v)
+
+        await db.commit()
+        await db.refresh(db_obj)
+        return db_obj
+
+    @staticmethod
+    async def get(
+        db: AsyncSession,
+        id: int,
+    ) -> NotificationOutbox:
+        stmt = select(NotificationOutbox).where(NotificationOutbox.id == id)
+        obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not obj:
+            raise NotFoundError("Notification outbox not exist")
+        return obj
+
+    @staticmethod
+    async def list(
+        db: AsyncSession,
+        status: Optional[str] = None,
+        channel: Optional[str] = None,
+        priority: Optional[int] = None,
+        msg_id: Optional[str] = None,
+        page: int = 1,
+        size: int = 20,
+    ):
+        stmt = select(NotificationOutbox)
+
+        if status:
+            stmt = stmt.where(NotificationOutbox.status == status)
+        if channel:
+            stmt = stmt.where(NotificationOutbox.channel == channel)
+        if priority:
+            stmt = stmt.where(NotificationOutbox.priority == priority)
+        if msg_id:
+            stmt = stmt.where(NotificationOutbox.msg_id == msg_id)
+
+        stmt = stmt.order_by(NotificationOutbox.created_at.desc())
+        return await paginate(db, stmt, page, size)

+ 26 - 40
app/services/notification_service.py

@@ -1,52 +1,38 @@
 # app/services/notification_service.py
 # app/services/notification_service.py
 
 
-import uuid
-from typing import List, Dict, Any
+from typing import Dict, Any, Optional
 
 
-from redis.asyncio import Redis
-from app.utils.redis_utils import redis_qpush, redis_qpop
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.schemas.notification_outbox import NotificationOutboxCreate
+from app.services.notification_outbox_service import NotificationOutboxService
 
 
 
 
 class NotificationService:
 class NotificationService:
 
 
     @staticmethod
     @staticmethod
-    async def post_email(
-        redis_client: Redis,
-        receiver: str,
-        template_id: str,
-        payload: Dict[str, Any]
+    async def post_message(
+        db: AsyncSession,
+        channel: str,
+        payload: Dict[str, Any],
+        priority: int = 10,
+        msg_id: Optional[str] = None,
+        status: str = "pending",
+        attempts: int = 0,
+        next_retry_at=None,
     ):
     ):
-        notification_payload = {
-            "notification_id": f"nid_{uuid.uuid4().hex}",
-            "channel": "email",
-            "template_id": template_id,
-            "receiver": receiver,
-            "payload": payload
-        }
-        
-        await redis_qpush(
-            redis_client,
-            "vas_notification_queue",
-            notification_payload
-        )
-        
-    @staticmethod
-    async def post_wechat(
-        redis_client: Redis,
-        template_id: str,
-        payload: Dict[str, Any]
-    ):
-        notification_payload = {
-            "notification_id": f"nid_{uuid.uuid4().hex}",
-            "channel": "wechat",
-            "template_id": template_id,
-            "payload": payload
-        }
-        
-        await redis_qpush(
-            redis_client,
-            "vas_notification_queue",
-            notification_payload
+        await NotificationOutboxService.create(
+            db,
+            NotificationOutboxCreate(
+                msg_id=msg_id,
+                channel=channel,
+                payload=payload,
+                priority=priority,
+                status=status,
+                attempts=attempts,
+                next_retry_at=next_retry_at,
+            ),
         )
         )
 
 
+
     
     

+ 20 - 15
app/services/payment_service.py

@@ -157,8 +157,7 @@ class PaymentService:
     async def confirm_by_user(
     async def confirm_by_user(
         db: AsyncSession,
         db: AsyncSession,
         payload: VasPaymentConfirmationCreate,
         payload: VasPaymentConfirmationCreate,
-        current_user: VasUser,
-        redis_client: Redis
+        current_user: VasUser
     ):
     ):
         """
         """
         用户点击“我已支付”
         用户点击“我已支付”
@@ -194,19 +193,22 @@ class PaymentService:
 
 
         formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
         formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
         # 2️⃣ 推送异步通知给管理员
         # 2️⃣ 推送异步通知给管理员
-        await NotificationService.post_wechat(
-            redis_client=redis_client,
-            template_id="payment_user_confirmed",
+        await NotificationService.post_message(
+            db=db,
+            channel="wechat",
             payload={
             payload={
-                "order_id": payment.order_id,
-                "payment_id": payload.payment_id,
-                "user_email": current_user.email,
-                "amount": payload.amount,
-                "currency": payload.currency,
-                "token": "",
-                "confirmed_at": formatted_time,
-                "provider": payment.provider
-            }
+                "template_id": "payment_user_confirmed",
+                "payload": {
+                    "order_id": payment.order_id,
+                    "payment_id": payload.payment_id,
+                    "user_email": current_user.email,
+                    "amount": payload.amount,
+                    "currency": payload.currency,
+                    "token": "",
+                    "confirmed_at": formatted_time,
+                    "provider": payment.provider
+                }
+            },
         )
         )
         return record
         return record
     
     
@@ -629,4 +631,7 @@ class PaymentService:
         id: int,
         id: int,
     ) -> VasPayment:
     ) -> VasPayment:
         stmt = select(VasPayment).where(VasPayment.id == id)
         stmt = select(VasPayment).where(VasPayment.id == id)
-        return (await db.execute(stmt)).scalar_one_or_none()
+        obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not obj:
+            raise NotFoundError(message="Payment not found")
+        return obj

+ 47 - 39
app/services/seaweedfs_service.py

@@ -1,5 +1,5 @@
 import os
 import os
-import httpx
+import aiohttp
 from fastapi import UploadFile
 from fastapi import UploadFile
 from app.core.biz_exception import BizLogicError
 from app.core.biz_exception import BizLogicError
 from app.core.logger import logger
 from app.core.logger import logger
@@ -18,17 +18,18 @@ class SeaweedFSService:
         上传文件到 SeaweedFS
         上传文件到 SeaweedFS
         """
         """
         try:
         try:
-            async with httpx.AsyncClient(timeout=10.0) as client:
+            timeout = aiohttp.ClientTimeout(total=10.0)
+            async with aiohttp.ClientSession(timeout=timeout) as client:
                 # 1️⃣ 向 Master 申请 fid
                 # 1️⃣ 向 Master 申请 fid
                 # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
                 # logger.info(f"Connecting to SeaweedFS Master: {cls.MASTER_URL}")
                 try:
                 try:
-                    assign_resp = await client.get(f"{cls.MASTER_URL}/dir/assign")
-                    assign_resp.raise_for_status()
+                    async with client.get(f"{cls.MASTER_URL}/dir/assign") as assign_resp:
+                        if assign_resp.status >= 300:
+                            raise BizLogicError("Storage service unavailable")
+                        assign_data = await assign_resp.json()
                 except Exception as e:
                 except Exception as e:
                     logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
                     logger.error(f"Failed to connect to Master at {cls.MASTER_URL}. Error: {e}")
                     raise BizLogicError("Storage service unavailable")
                     raise BizLogicError("Storage service unavailable")
-
-                assign_data = assign_resp.json()
                 fid = assign_data.get("fid")
                 fid = assign_data.get("fid")
                 
                 
                 # 2️⃣ 获取上传地址
                 # 2️⃣ 获取上传地址
@@ -54,17 +55,26 @@ class SeaweedFSService:
                 }
                 }
 
 
                 # logger.info(f"Uploading to Volume Node: {upload_url}")
                 # logger.info(f"Uploading to Volume Node: {upload_url}")
-                upload_resp = await client.post(upload_url, files=files)
+                form = aiohttp.FormData()
+                form.add_field(
+                    "file",
+                    file_content,
+                    filename=file.filename,
+                    content_type=file.content_type or "application/octet-stream",
+                )
+                async with client.post(upload_url, data=form) as upload_resp:
+                    status = upload_resp.status
+                    resp_text = await upload_resp.text()
 
 
-                if upload_resp.status_code == 201:
+                if status == 201:
                     return {
                     return {
                         "fid": fid,
                         "fid": fid,
                         "url": download_url,
                         "url": download_url,
                         "size": len(file_content)
                         "size": len(file_content)
                     }
                     }
 
 
-                logger.error(f"Upload failed. Status: {upload_resp.status_code}, Body: {upload_resp.text}")
-                raise BizLogicError(f"Storage upload error: {upload_resp.text}")
+                logger.error(f"Upload failed. Status: {status}, Body: {resp_text}")
+                raise BizLogicError(f"Storage upload error: {resp_text}")
 
 
         except BizLogicError as e:
         except BizLogicError as e:
             raise e
             raise e
@@ -84,18 +94,18 @@ class SeaweedFSService:
             # fid 格式通常是 "3,016a...",逗号前是 volumeId
             # fid 格式通常是 "3,016a...",逗号前是 volumeId
             volume_id = fid.split(",")[0]
             volume_id = fid.split(",")[0]
 
 
-            async with httpx.AsyncClient(timeout=10.0) as client:
+            timeout = aiohttp.ClientTimeout(total=10.0)
+            async with aiohttp.ClientSession(timeout=timeout) as client:
                 # 1️⃣ 查询文件位置
                 # 1️⃣ 查询文件位置
-                lookup_resp = await client.get(
+                async with client.get(
                     f"{cls.MASTER_URL}/dir/lookup",
                     f"{cls.MASTER_URL}/dir/lookup",
                     params={"volumeId": volume_id},
                     params={"volumeId": volume_id},
-                )
-                
-                if lookup_resp.status_code != 200:
-                    logger.warning(f"Lookup failed for fid {fid}: {lookup_resp.text}")
-                    return None
-                    
-                data = lookup_resp.json()
+                ) as lookup_resp:
+                    if lookup_resp.status != 200:
+                        text = await lookup_resp.text()
+                        logger.warning(f"Lookup failed for fid {fid}: {text}")
+                        return None
+                    data = await lookup_resp.json()
                 locations = data.get("locations")
                 locations = data.get("locations")
 
 
                 if not locations:
                 if not locations:
@@ -106,13 +116,13 @@ class SeaweedFSService:
                 public_url = locations[0]["publicUrl"]
                 public_url = locations[0]["publicUrl"]
                 file_url = f"http://{public_url}/{fid}"
                 file_url = f"http://{public_url}/{fid}"
 
 
-                file_resp = await client.get(file_url)
-                
-                if file_resp.status_code == 200:
-                    return (
-                        file_resp.content,
-                        file_resp.headers.get("Content-Type", "application/octet-stream"),
-                    )
+                async with client.get(file_url) as file_resp:
+                    if file_resp.status == 200:
+                        content = await file_resp.read()
+                        return (
+                            content,
+                            file_resp.headers.get("Content-Type", "application/octet-stream"),
+                        )
 
 
                 return None
                 return None
 
 
@@ -131,17 +141,16 @@ class SeaweedFSService:
 
 
             volume_id = fid.split(",")[0]
             volume_id = fid.split(",")[0]
 
 
-            async with httpx.AsyncClient(timeout=5.0) as client:
+            timeout = aiohttp.ClientTimeout(total=5.0)
+            async with aiohttp.ClientSession(timeout=timeout) as client:
                 # 1️⃣ 查找位置
                 # 1️⃣ 查找位置
-                lookup_resp = await client.get(
+                async with client.get(
                     f"{cls.MASTER_URL}/dir/lookup",
                     f"{cls.MASTER_URL}/dir/lookup",
                     params={"volumeId": volume_id},
                     params={"volumeId": volume_id},
-                )
-                
-                if lookup_resp.status_code != 200:
-                    return False
-                    
-                data = lookup_resp.json()
+                ) as lookup_resp:
+                    if lookup_resp.status != 200:
+                        return False
+                    data = await lookup_resp.json()
                 locations = data.get("locations")
                 locations = data.get("locations")
 
 
                 if not locations:
                 if not locations:
@@ -151,11 +160,10 @@ class SeaweedFSService:
                 public_url = locations[0]["publicUrl"]
                 public_url = locations[0]["publicUrl"]
                 delete_url = f"http://{public_url}/{fid}"
                 delete_url = f"http://{public_url}/{fid}"
 
 
-                del_resp = await client.delete(delete_url)
-                
-                # SeaweedFS 删除成功通常返回 200 或 202
-                return del_resp.status_code in [200, 202]
+                async with client.delete(delete_url) as del_resp:
+                    # SeaweedFS 删除成功通常返回 200 或 202
+                    return del_resp.status in [200, 202]
 
 
         except Exception as e:
         except Exception as e:
             logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
             logger.error(f"SeaweedFS delete failed for fid {fid}: {e}")
-            return False
+            return False

+ 98 - 16
app/services/slot_snapshot_service.py

@@ -4,12 +4,14 @@ from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select, func, desc, and_
 from sqlalchemy import select, func, desc, and_
 from redis.asyncio import Redis
 from redis.asyncio import Redis
 from typing import List, Dict, Any
 from typing import List, Dict, Any
+from datetime import datetime, timedelta
+from app.models.vas_task import VasTask
 from app.models.slot_snapshot import VasSlotSnapshot
 from app.models.slot_snapshot import VasSlotSnapshot
 from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
 from app.schemas.slot_snapshot import SlotSnapshotCreate, SlotSnapshotOut
 from app.services.notification_service import NotificationService
 from app.services.notification_service import NotificationService
 from app.models.slot_snapshot import VasSlotSnapshot
 from app.models.slot_snapshot import VasSlotSnapshot
 from app.models.slot_refresh_status import VasSlotRefreshStatus
 from app.models.slot_refresh_status import VasSlotRefreshStatus
-
+from app.utils.throttler import RedisThrottler, BusinessRateLimiter
 
 
 class SlotSnapshotService:
 class SlotSnapshotService:
 
 
@@ -23,28 +25,108 @@ class SlotSnapshotService:
         db.add(rec)
         db.add(rec)
         await db.commit()
         await db.commit()
         await db.refresh(rec) 
         await db.refresh(rec) 
-        # 修复点在这里:手动将 datetime 对象转为 string
+
+        # 1. 准备序列化数据
         earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
         earliest_date_str = rec.earliest_date.isoformat() if rec.earliest_date else None
         snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
         snapshot_at_str = rec.snapshot_at.isoformat() if rec.snapshot_at else None
 
 
-        await NotificationService.post_wechat(
-            redis_client=redis_client,
-            template_id="slot_snapshot",
-            payload={
+        # 2. 使用通用 Redis 限流器
+        # 只要 状态 或 日期 变了,Signature 就会变,从而触发推送
+        throttle_key = f"throttle:slot_snapshot:{rec.routing_key}"
+        signature = f"{rec.availability_status}|{earliest_date_str or ''}"
+        
+        is_throttled = await RedisThrottler.should_throttle(redis_client, throttle_key, signature, expire_seconds=1800)
+
+        if not is_throttled:
+            await NotificationService.post_message(
+                db=db,
+                channel="wechat",
+                payload={
+                    "template_id": "slot_snapshot",
+                    "payload": {
+                        **data.dict(), # 简化写法
+                        "earliest_date": earliest_date_str,
+                        "snapshot_at": snapshot_at_str
+                    }
+                },
+            )
+
+        # 3. 订阅通知
+        await SlotSnapshotService._notify_subscribers(
+            db=db,
+            rec=rec,
+            earliest_date_str=earliest_date_str
+        )
+        return rec
+
+    @staticmethod
+    async def _notify_subscribers(
+        db: AsyncSession,
+        rec: VasSlotSnapshot,
+        earliest_date_str: str
+    ) -> None:
+        stmt = select(VasTask).where(VasTask.status == "pending", VasTask.routing_key == "sub.slot")
+        tasks = (await db.execute(stmt)).scalars().all()
+
+        email_receivers = []
+        whatsapp_receivers = []
+
+        # 业务逻辑 ID:优先用日期,没有日期用状态
+        current_biz_id = earliest_date_str or rec.availability_status
+
+        for task in tasks:
+            user_inputs = task.user_inputs or {}
+            if user_inputs.get("slot_routing_key") != rec.routing_key:
+                continue
+
+            # 使用通用业务限流器
+            can_notify, updated_meta = BusinessRateLimiter.check_notification_limit(
+                meta=dict(task.meta) if task.meta else {},
+                current_id=current_biz_id,
+                cooldown_hours=8,
+                daily_max=3
+            )
+
+            if not can_notify:
+                continue
+
+            # 更新数据库状态
+            task.meta = updated_meta
+            task.notify_count = updated_meta["notify_count"]
+
+            # 收集接收者
+            message_payload = {
+                "slot_routing_key": rec.routing_key,
                 "country": rec.country,
                 "country": rec.country,
                 "city": rec.city,
                 "city": rec.city,
                 "visa_type": rec.visa_type,
                 "visa_type": rec.visa_type,
-                "routing_key": rec.routing_key,
-                "availability_status": rec.availability_status,
-                # 使用转换后的字符串
                 "earliest_date": earliest_date_str,
                 "earliest_date": earliest_date_str,
-                "availability": rec.availability,
-                "snapshot_source": rec.snapshot_source,
-                "snapshot_at": snapshot_at_str
+                "website": rec.website,
+                "message": f"Slot update: {rec.country}-{rec.city} {rec.visa_type} {earliest_date_str or rec.availability_status}",
             }
             }
-        )
-        return rec
 
 
+            if email := user_inputs.get("email"):
+                email_receivers.append(email)
+            if phone := user_inputs.get("phone"):
+                # 1. 确保国家代码不带 + 
+                code = str(user_inputs.get("phone_country_code") or "").replace("+", "")
+                # 2. 去掉手机号开头的 0 (使用 lstrip)
+                clean_phone = str(phone).lstrip('0')
+                # 3. 组合,不加 + 前缀
+                whatsapp_receivers.append(f"{code}{clean_phone}")
+
+        # 批量发送通知
+        if email_receivers:
+            await NotificationService.post_message(db=db, channel="email", payload={
+                "template_id": "slot_subscription", "receivers": list(set(email_receivers)), "payload": message_payload
+            })
+        if whatsapp_receivers:
+            await NotificationService.post_message(db=db, channel="whatsapp", payload={
+                "template_id": "slot_subscription", "receivers": list(set(whatsapp_receivers)), "payload": message_payload
+            })
+
+        await db.commit()
+            
     @staticmethod
     @staticmethod
     async def latest_for(
     async def latest_for(
         db: AsyncSession,
         db: AsyncSession,
@@ -64,7 +146,7 @@ class SlotSnapshotService:
         )
         )
 
 
         return await db.scalar(stmt)
         return await db.scalar(stmt)
-    
+
     async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
     async def get_slot_overview(db: AsyncSession, city: str) -> List[Dict[str, Any]]:
         """
         """
         异步获取指定城市的最新 Slot 快照 dashboard 数据
         异步获取指定城市的最新 Slot 快照 dashboard 数据
@@ -133,4 +215,4 @@ class SlotSnapshotService:
 
 
             dashboard_data.append(item)
             dashboard_data.append(item)
 
 
-        return dashboard_data
+        return dashboard_data

+ 46 - 37
app/services/task_handlers.py

@@ -144,19 +144,22 @@ async def forward_troov_appointment_letter(db: AsyncSession, redis_client: Redis
     # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis)
     # 4. 发送预约成功通知邮件给用户 (利用 NotificationService + Redis)
     if user and user.email:
     if user and user.email:
         logger.info(f"📧 Sending appointment confirmation email to {user.email}")
         logger.info(f"📧 Sending appointment confirmation email to {user.email}")
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=user.email,
-            template_id="appointment_confirmation", # 对应之前定义的模板ID
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "username": user.nickname or first_name,
-                "order_id": str(order.id),
-                "country": "France",
-                "city": "Dublin", # 根据 routing_key 或 inputs 推断
-                "appointment_date": formatted_date_time,
-                "visa_type": "Short Stay Any Purpose",
-                "user_email": forward_to
-            }
+                "template_id": "appointment_confirmation", # 对应之前定义的模板ID
+                "receiver": user.email,
+                "payload": {
+                    "username": user.nickname or first_name,
+                    "order_id": str(order.id),
+                    "country": "France",
+                    "city": "Dublin", # 根据 routing_key 或 inputs 推断
+                    "appointment_date": formatted_date_time,
+                    "visa_type": "Short Stay Any Purpose",
+                    "user_email": forward_to
+                }
+            },
         )
         )
 
 
 
 
@@ -199,19 +202,22 @@ async def forward_vfs_appointment_letter(db: AsyncSession, redis_client: Redis,
 
 
     # 发送通知
     # 发送通知
     if user and user.email:
     if user and user.email:
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=user.email,
-            template_id="appointment_confirmation",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "username": user.nickname or 'consumer',
-                "order_id": str(order.id),
-                "country": "Netherlands",
-                "city": "Dublin",
-                "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
-                "visa_type": "Tourist",
-                "user_email": forward_to
-            }
+                "template_id": "appointment_confirmation",
+                "receiver": user.email,
+                "payload": {
+                    "username": user.nickname or 'consumer',
+                    "order_id": str(order.id),
+                    "country": "Netherlands",
+                    "city": "Dublin",
+                    "appointment_date": "Check attachment", # VFS 可能没有直接的日期在 history
+                    "visa_type": "Tourist",
+                    "user_email": forward_to
+                }
+            },
         )
         )
 
 
 
 
@@ -260,17 +266,20 @@ async def forward_visametric_appointment_letter(db: AsyncSession, redis_client:
     if user and user.email:
     if user and user.email:
         display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
         display_date = f"{slot_date} {slot_time}" if slot_date else "Confirmed"
         
         
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=user.email,
-            template_id="appointment_confirmation",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "username": user.nickname or first_name,
-                "order_id": str(order.id),
-                "country": "Germany",
-                "city": "Dublin",
-                "appointment_date": display_date,
-                "visa_type": "Tourist",
-                "user_email": forward_to
-            }
-        )
+                "template_id": "appointment_confirmation",
+                "receiver": user.email,
+                "payload": {
+                    "username": user.nickname or first_name,
+                    "order_id": str(order.id),
+                    "country": "Germany",
+                    "city": "Dublin",
+                    "appointment_date": display_date,
+                    "visa_type": "Tourist",
+                    "user_email": forward_to
+                }
+            },
+        )

+ 10 - 123
app/services/telegram_service.py

@@ -1,78 +1,13 @@
 # app/services/telegram_service.py
 # app/services/telegram_service.py
 
 
-import httpx
+import aiohttp
 from app.core.biz_exception import BizLogicError
 from app.core.biz_exception import BizLogicError
 from app.schemas.telegram import TelegramIn
 from app.schemas.telegram import TelegramIn
 
 
-
-def _parse_slots_summary(availability_data):
-    """
-    availability:
-    [
-      { "date": "2025-01-05", "times": [] },
-      { "date": "2025-01-06", "times": [{"time": "09:30"}] }
-    ]
-    """
-    if not availability_data:
-        return "No specific slots data."
-    
-    if not isinstance(availability_data, list):
-        return str(availability_data)
-        
-    summaries = []
-    # 限制显示天数,防止消息过长
-    display_limit = 4 
-    
-    for day in availability_data[:display_limit]:
-        date_str = day.get("date")
-        
-        times = day.get("times", [])
-        
-        if not times:
-            # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
-            summaries.append(f"{date_str}")
-        else:
-            # 有具体时间的处理
-            time_list = [t.get("time", "") for t in times[:5]]
-            time_text = ", ".join(filter(None, time_list)) # 过滤空时间
-            if len(times) > 5:
-                time_text += f" (+{len(times) - 5})"
-            summaries.append(f"{date_str}: {time_text}")
-            
-    if len(availability_data) > display_limit:
-        summaries.append(f"(+{len(availability_data) - display_limit} more days)")
-        
-    return " | ".join(summaries)
-
-
-def _get_display_meta(slot_snapshot):
-    """
-    Return dynamic header logic based on status.
-    Goal: Put the most important info in the notification preview.
-    """
-    date_str = slot_snapshot.get("earliest_date")
-    
-    if slot_snapshot.get("availability_status") == 'Available':
-        # Notification Preview: "🟢 UK London: 15 Jan 2024"
-        emoji = "🟢"
-        # If available, the DATE is the headline
-        headline = f'{slot_snapshot.get("country")}, {slot_snapshot.get("city")}: {date_str}'
-        color = "info" # Green for WeChat
-    elif slot_snapshot.get("availability_status") == 'Waitlist':
-        emoji = "🟡"
-        headline = f'Waitlist: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
-        color = "warning" # Orange for WeChat
-    else:
-        emoji = "🔴"
-        headline = f'No Slots: {slot_snapshot.get("country")}, {slot_snapshot.get("city")}'
-        color = "comment" # Grey for WeChat
-        
-    return emoji, headline, color, date_str
-
 class TelegramService:
 class TelegramService:
 
 
     @staticmethod
     @staticmethod
-    async def push_to_telegram(payload: TelegramIn):
+    async def push_text(payload: TelegramIn):
         url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
         url = f"https://api.telegram.org/bot{payload.api_token}/sendMessage"
 
 
         body = {
         body = {
@@ -81,59 +16,11 @@ class TelegramService:
             "parse_mode": "HTML",
             "parse_mode": "HTML",
         }
         }
 
 
-        async with httpx.AsyncClient(timeout=10) as client:
-            resp = await client.post(url, json=body)
-
-        if resp.status_code != 200:
-            raise BizLogicError(
-                f"Telegram push failed: {resp.status_code}, {resp.text}"
-            )
-            
-    async def push_slot_snapshot(api_token: str, chat_id: str, slot_snapshot) -> str:
-        emoji, headline, _, date_str = _get_display_meta(slot_snapshot)        
-        # 解析详情
-        slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
-        
-        # 处理可能的 None 值
-        website = slot_snapshot.get("website") or "#"  # 如果没有网址,给一个空锚点
-        # 格式化更新时间
-        checked_at = slot_snapshot.get("snapshot_at", "")
-        if "T" in str(updated_at):
-             checked_at = str(checked_at).replace("T", " ")
-        
-        TEMPLATE = (
-            "{emoji} <b>{headline}</b>\n"
-            "──────────────────\n"
-            "🛂 <b>Visa:</b> {visa_type}\n"
-            "📅 <b>Earliest:</b> <code>{date_str}</code>\n"
-            "📊 <b>Slots:</b> {slots_summary}\n"
-            "──────────────────\n"
-            "🔗 <a href='{website}'><b>Book Now ➜</b></a>\n\n"
-            "🕒 <i>Checked at {checked_at}</i>"
-        )
-
-        content = TEMPLATE.format_map({
-            "emoji": emoji,
-            "headline": headline,
-            "visa_type": slot_snapshot.get("visa_type", "N/A"),
-            "date_str": date_str,
-            "slots_summary": slots_summary,
-            "website": website,
-            "checked_at": checked_at,
-        })
-        
-        url = f"https://api.telegram.org/bot{api_token}/sendMessage"
-
-        body = {
-            "chat_id": chat_id,
-            "text": content,
-            "parse_mode": "HTML",
-        }
-
-        async with httpx.AsyncClient(timeout=10) as client:
-            resp = await client.post(url, json=body)
-
-        if resp.status_code != 200:
-            raise BizLogicError(
-                f"Telegram push failed: {resp.status_code}, {resp.text}"
-            )
+        timeout = aiohttp.ClientTimeout(total=10)
+        async with aiohttp.ClientSession(timeout=timeout) as client:
+            async with client.post(url, json=body) as resp:
+                if resp.status != 200:
+                    text = await resp.text()
+                    raise BizLogicError(
+                        f"Telegram push failed: {resp.status}, {text}"
+                    )

+ 13 - 10
app/services/ticket_service.py

@@ -40,17 +40,20 @@ class TicketService:
         await db.refresh(ticket)
         await db.refresh(ticket)
         
         
         formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
         formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
-        await NotificationService.post_email(
-            redis_client=redis_client,
-            receiver=current_user.email,
-            template_id="ticket_created",
+        await NotificationService.post_message(
+            db=db,
+            channel="email",
             payload={
             payload={
-                "app_name": "Visafly",
-                "username": current_user.email,
-                "ticket_id": ticket.id,
-                "ticket_type": ticket.type,
-                "ticket_url": "http://45.137.220.138:3000/dashboard",
-                "created_at": formatted_time
+                "template_id": "ticket_created",
+                "receiver": current_user.email,
+                "payload": {
+                    "app_name": "Visafly",
+                    "username": current_user.email,
+                    "ticket_id": ticket.id,
+                    "ticket_type": ticket.type,
+                    "ticket_url": "https://visafly.top/dashboard",
+                    "created_at": formatted_time
+                }
             },
             },
         )
         )
 
 

+ 18 - 110
app/services/wechat_service.py

@@ -1,75 +1,6 @@
-import httpx
-from typing import Dict, Any, List, Union
-from datetime import datetime
+import aiohttp
+from typing import Dict, Any
 from app.core.biz_exception import BizLogicError
 from app.core.biz_exception import BizLogicError
-from app.schemas.wechat import WechatIn
-
-
-def _parse_slots_summary(availability_data):
-    """
-    availability:
-    [
-      { "date": "2025-01-05", "times": [] },
-      { "date": "2025-01-06", "times": [{"time": "09:30"}] }
-    ]
-    """
-    if not availability_data:
-        return "No specific slots data."
-    
-    if not isinstance(availability_data, list):
-        return str(availability_data)
-        
-    summaries = []
-    # 限制显示天数,防止消息过长
-    display_limit = 4 
-    
-    for day in availability_data[:display_limit]:
-        date_str = day.get("date")
-        
-        times = day.get("times", [])
-        
-        if not times:
-            # 修改点:即使没有具体时间,只要有日期条目,也显示为可用
-            summaries.append(f"{date_str}")
-        else:
-            # 有具体时间的处理
-            time_list = [t.get("time", "") for t in times[:5]]
-            time_text = ", ".join(filter(None, time_list)) # 过滤空时间
-            if len(times) > 5:
-                time_text += f" (+{len(times) - 5})"
-            summaries.append(f"{date_str}: {time_text}")
-            
-    if len(availability_data) > display_limit:
-        summaries.append(f"(+{len(availability_data) - display_limit} more days)")
-        
-    return " | ".join(summaries)
-
-
-def _get_display_meta(slot_snapshot):
-    """
-    Return dynamic header logic based on status.
-    """
-    # 修复:确保这里也调用增强后的日期格式化
-    date_str = slot_snapshot.get("earliest_date")
-    
-    status = slot_snapshot.get("availability_status")
-    country = slot_snapshot.get("country", "Unknown")
-    city = slot_snapshot.get("city", "Unknown")
-
-    if status == 'Available':
-        emoji = "🟢"
-        headline = f'{country}, {city}: {date_str}'
-        color = "info" # Green
-    elif status == 'Waitlist':
-        emoji = "🟡"
-        headline = f'Waitlist: {country}, {city}'
-        color = "warning" # Orange
-    else:
-        emoji = "🔴"
-        headline = f'No Slots: {country}, {city}'
-        color = "comment" # Grey
-        
-    return emoji, headline, color, date_str
 
 
 class WechatService:
 class WechatService:
     
     
@@ -78,16 +9,16 @@ class WechatService:
         """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
         """内部私有方法:发送 HTTP 请求到企业微信 Webhook"""
         url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
         url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
 
 
+        timeout = aiohttp.ClientTimeout(total=10)
         try:
         try:
-            async with httpx.AsyncClient(timeout=10) as client:
-                response = await client.post(url, json=payload)
-        except httpx.RequestError as e:
+            async with aiohttp.ClientSession(timeout=timeout) as client:
+                async with client.post(url, json=payload) as response:
+                    if response.status != 200:
+                        raise BizLogicError(f"Wechat push failed, http_status={response.status}")
+                    data = await response.json()
+        except aiohttp.ClientError as e:
             raise BizLogicError(f"Wechat push request error: {e}")
             raise BizLogicError(f"Wechat push request error: {e}")
 
 
-        if response.status_code != 200:
-            raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
-
-        data = response.json()
         if data.get("errcode") != 0:
         if data.get("errcode") != 0:
             raise BizLogicError(
             raise BizLogicError(
                 f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
                 f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
@@ -106,35 +37,12 @@ class WechatService:
         return await WechatService._send_webhook(api_token, body)
         return await WechatService._send_webhook(api_token, body)
 
 
     @staticmethod
     @staticmethod
-    async def push_slot_snapshot(api_token: str, slot_snapshot: Dict[str, Any]):
-        # 获取元数据
-        emoji, headline, color, date_str = _get_display_meta(slot_snapshot)
-        
-        # 解析详情
-        slots_summary = _parse_slots_summary(slot_snapshot.get("availability"))
-        
-        # 处理可能的 None 值
-        website = slot_snapshot.get("website") or "#"  # 如果没有网址,给一个空锚点
-        # 格式化更新时间
-        updated_at = slot_snapshot.get("snapshot_at", "")
-        if "T" in str(updated_at):
-             updated_at = str(updated_at).replace("T", " ")
-
-        TEMPLATE = (
-            "# {emoji} {headline}\n"
-            "\n"
-            "👉 [Book now]({website})\n"
-        )
-
-        markdown_content = TEMPLATE.format_map({
-            "emoji": emoji,
-            "headline": headline,
-            "visa_type": slot_snapshot.get("visa_type", "N/A"),
-            "color": color,
-            "date_str": date_str,
-            "slots_summary": slots_summary,
-            "website": website,
-            "updated_at": updated_at,
-        })
-        
-        return await WechatService.push_markdown(api_token, markdown_content)
+    async def push_text(api_token: str, content: str):
+        """发送 Text 消息"""
+        body = {
+            "msgtype": "text",
+            "text": {
+                "content": content
+            }
+        }
+        return await WechatService._send_webhook(api_token, body)

+ 46 - 0
app/services/whatsapp_service.py

@@ -0,0 +1,46 @@
+import aiohttp
+from typing import Optional
+from app.core.biz_exception import BizLogicError
+
+
+class WhatsappService:
+
+    @staticmethod
+    async def send_text(
+        api_base_url: str,
+        session: str,
+        chat_id: str,
+        message: str,
+        api_key: Optional[str] = None,
+    ):
+        if not api_base_url:
+            raise BizLogicError("Whatsapp api_base_url not configured")
+        if not chat_id:
+            raise BizLogicError("Whatsapp chat_id missing")
+
+        headers = {}
+        if api_key:
+            headers["X-Api-Key"] = api_key
+
+        read_url = f"{api_base_url}/api/{session}/chats/{chat_id}/messages/read"
+        send_url = f"{api_base_url}/api/sendText"
+
+        timeout = aiohttp.ClientTimeout(total=10)
+        try:
+            async with aiohttp.ClientSession(timeout=timeout) as session_client:
+                async with session_client.post(read_url, json={}, headers=headers) as read_resp:
+                    if read_resp.status >= 300:
+                        raise BizLogicError(f"Whatsapp read failed, http_status={read_resp.status}")
+
+                payload = {
+                    "session": session,
+                    "chatId": chat_id,
+                    "text": message,
+                }
+                async with session_client.post(send_url, json=payload, headers=headers) as send_resp:
+                    if send_resp.status >= 300:
+                        raise BizLogicError(f"Whatsapp push failed, http_status={send_resp.status}")
+        except aiohttp.ClientError as e:
+            raise BizLogicError(f"Whatsapp push request error: {e}")
+
+        return True

+ 0 - 548
app/tasks/notification_task.py

@@ -1,548 +0,0 @@
-
-import asyncio
-import json
-from datetime import datetime
-from typing import Dict, Any
-from redis.asyncio import Redis
-from sqlalchemy.ext.asyncio import AsyncSession
-from app.services.wechat_service import WechatService
-from app.services.email_authorizations_service import EmailAuthorizationService
-from app.utils.redis_utils import redis_qpop
-
-
-THROTTLE_EXPIRY = 1800 
-
-async def notification_consumer(session_factory, redis_client: Redis):
-    """
-    异步消费 Redis 队列 vas_notification_queue
-    """
-    queue_name = "vas_notification_queue"
-    while True:
-        try:
-            # 阻塞获取队列消息
-            message: Dict[str, Any] = await redis_qpop(redis_client, queue_name, timeout=5)
-            if not message:
-                await asyncio.sleep(1)  # 队列为空,休眠
-                continue
-
-            channel = message.get("channel", "")
-            template_id = message.get("template_id")
-            payload = message.get("payload", {})
-            
-            # 按渠道发送
-            if "email" == channel:
-                content = None
-                sender = None
-                subject = None
-                receiver =  message.get("receiver", "")
-                if "email_verification_for_bind" == template_id:
-                    sender = "donotreply@visafly.top"
-                    subject = "Email Verification"
-                    content = template_for_bind_email(payload)
-                if "email_verification_for_reset" == template_id:
-                    sender = "donotreply@visafly.top"
-                    subject = "Reset Password"
-                    content = template_for_reset_pwd(payload)
-                if "login_credentials" == template_id:
-                    sender = "donotreply@visafly.top"
-                    subject = "Your Account Details"
-                    content = template_for_login_credentials(payload)
-                if "ticket_created" == template_id:
-                    sender = "donotreply@visafly.top"
-                    subject = "Ticket Created"
-                    content = template_ticket_open(payload)
-                if "appointment_confirmation" == template_id:
-                    sender = "donotreply@visafly.top"
-                    subject = "Appointment Confirmation"
-                    content = template_appointment_confirmation(payload)
-                if content:
-                    async with session_factory() as db:  # type: AsyncSession
-                        auth = await EmailAuthorizationService.get_by_email(db, sender)
-                        send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content)
-                        print(f"Email send result: {send_result}")
-
-            if "wechat" == channel:
-                api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
-                
-                if "payment_user_confirmed" == template_id:
-                    status = await WechatService.push_payment_template(api_token, payload)
-                    print(f"Wechat send status: {status}")
-                
-                if "slot_snapshot" == template_id:
-                    # 1. 提取标识字段
-                    country = payload.get("country", "unknown")
-                    city = payload.get("city", "unknown")
-                    visa_type = payload.get("visa_type", "unknown")
-                    earliest_date = payload.get("earliest_date", "N/A")
-                    
-                    # 2. 生成 Redis 频率限制 Key
-                    # 格式: throttle:slot_snapshot:USA:Beijing:B1
-                    throttle_key = f"throttle:slot_snapshot:{country}:{city}:{visa_type}"
-                    
-                    # 3. 检查是否存在记录(即是否在冷却期内)
-                    last_sent_val = await redis_client.get(throttle_key)
-                    
-                    # 4. 判断是否需要跳过
-                    # 如果记录存在,且 earliest_date 没有变化,则跳过推送
-                    if last_sent_val and last_sent_val == earliest_date:
-                        print(f"⏭️  Skipped redundant Wechat notification for {country}-{city} (In Cooling Period)")
-                        continue
-                    
-                    # 5. 执行发送
-                    status = await WechatService.push_slot_snapshot(api_token, payload)
-                    print(f"Wechat send status: {status}")
-                    
-                    # 6. 发送成功后更新 Redis 记录并设置过期时间
-                    # 存储当前的最早日期,下次如果日期变了,即便没过 30 分钟也会再次推送
-                    await redis_client.set(throttle_key, str(earliest_date), ex=THROTTLE_EXPIRY)
-            print(f"✅ Notification sent: {message.get('notification_id')}")
-
-        except Exception as e:
-            print(f"⚠️ Notification consumer error: {e}")
-            await asyncio.sleep(1)  # 避免异常循环过快
-
-def template_for_bind_email(payload):
-    """
-    生成绑定邮箱验证码邮件
-    
-    Args:
-        payload (dict): 包含以下字段:
-            - app_name: 应用名称 (默认 Visafly)
-            - code: 验证码
-            - expiration_time: 过期时间描述 (如 "10 minutes")
-    """
-    
-    # 1. 定义 HTML 模板
-    template = '''
-    <!DOCTYPE html>
-    <html>
-    <head>
-        <meta charset="UTF-8">
-        <title>Email Verification</title>
-        <style>
-            body { font-family: Arial, sans-serif; background-color: #f4f4f4; margin: 0; padding: 0; }
-            .container { max-width: 600px; margin: 20px auto; background-color: #ffffff; padding: 30px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
-            .code { font-size: 24px; font-weight: bold; letter-spacing: 5px; color: #333; background-color: #f0f0f0; padding: 15px; text-align: center; border-radius: 4px; margin: 20px 0; }
-            .footer { font-size: 12px; color: #888; margin-top: 30px; text-align: center; }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <h2>Verify your email address</h2>
-            <p>Hello,</p>
-            <p>You requested to bind this email address to your <strong>{{app_name}}</strong> account. Please use the verification code below to proceed:</p>
-            
-            <div class="code">{{code}}</div>
-            
-            <p>This code will expire in <strong>{{expiration_time}}</strong>.</p>
-            <p>If you did not request this change, please ignore this email.</p>
-            
-            <br>
-            <p>Best regards,<br>The {{app_name}} Team</p>
-            
-            <div class="footer">
-                &copy; 2026 {{app_name}}. All rights reserved.
-            </div>
-        </div>
-    </body>
-    </html>
-    '''
-
-    # 2. 执行数据替换
-    # 使用 .get() 提供默认值,防止缺少字段导致报错
-    # 使用 str() 确保数据是字符串类型
-    app_name = str(payload.get('app_name', 'Visafly'))
-    code = str(payload.get('code', ''))
-    expiration_time = str(payload.get('expiration_time', '10 minutes'))
-
-    # 链式替换所有占位符
-    html_content = template.replace('{{app_name}}', app_name) \
-                           .replace('{{code}}', code) \
-                           .replace('{{expiration_time}}', expiration_time)
-
-    return html_content
-
-def template_for_reset_pwd(payload):
-    """
-    生成重置密码验证码邮件
-    
-    Args:
-        payload (dict): 包含以下字段:
-            - app_name: 应用名称 (默认 Visafly)
-            - code: 验证码
-            - expiration_time: 过期时间描述 (如 "10 minutes")
-    """
-    
-    # 1. 定义 HTML 模板
-    template = '''
-    <!DOCTYPE html>
-    <html>
-    <head>
-        <meta charset="UTF-8">
-        <title>Reset Password</title>
-        <style>
-            body { font-family: 'Helvetica Neue', Arial, sans-serif; background-color: #f9f9f9; margin: 0; padding: 0; color: #333; }
-            .container { max-width: 600px; margin: 40px auto; background-color: #ffffff; padding: 40px; border-radius: 8px; box-shadow: 0 4px 10px rgba(0,0,0,0.05); }
-            .header { border-bottom: 1px solid #eee; padding-bottom: 20px; margin-bottom: 30px; }
-            .header h2 { margin: 0; color: #333; }
-            .code { font-size: 32px; font-weight: bold; letter-spacing: 5px; color: #2563eb; background-color: #eff6ff; padding: 20px; text-align: center; border-radius: 8px; margin: 30px 0; border: 1px solid #dbeafe; }
-            .warning { background-color: #fff7ed; border-left: 4px solid #f97316; padding: 15px; font-size: 14px; color: #c2410c; margin-top: 30px; }
-            .footer { font-size: 12px; color: #999; margin-top: 40px; text-align: center; border-top: 1px solid #eee; padding-top: 20px; }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <div class="header">
-                <h2>Password Reset Request</h2>
-            </div>
-            
-            <p>Hello,</p>
-            <p>We received a request to reset the password for your <strong>{{app_name}}</strong> account. Please use the following code to verify your identity:</p>
-            
-            <div class="code">{{code}}</div>
-            
-            <p>This code is valid for <strong>{{expiration_time}}</strong>.</p>
-            
-            <div class="warning">
-                <strong>Security Tip:</strong> If you did not request a password reset, please ignore this email. No changes will be made to your account.
-            </div>
-            
-            <br>
-            <p>Best regards,<br>The {{app_name}} Team</p>
-            
-            <div class="footer">
-                &copy; 2026 {{app_name}}. All rights reserved.<br>
-                This is an automated message, please do not reply.
-            </div>
-        </div>
-    </body>
-    </html>
-    '''
-
-    # 2. 执行数据替换
-    # 使用 .get() 提供默认值,防止缺少字段导致报错
-    # 使用 str() 确保数据是字符串类型,防止 replace 报错
-    app_name = str(payload.get('app_name', 'Visafly'))
-    code = str(payload.get('code', ''))
-    expiration_time = str(payload.get('expiration_time', '10 minutes'))
-
-    html_content = template.replace('{{app_name}}', app_name) \
-                           .replace('{{code}}', code) \
-                           .replace('{{expiration_time}}', expiration_time)
-
-    return html_content
-    
-def template_for_login_credentials(payload):
-    """
-    生成用户登录凭证邮件 (账号 + 临时密码)
-    
-    Args:
-        payload (dict): 包含以下字段:
-            - app_name: 应用名称
-            - username: 登录账号
-            - password: 临时密码
-            - login_url: 登录链接
-    """
-    
-    # 1. 定义 HTML 模板
-    template = '''
-    <!DOCTYPE html>
-    <html>
-    <head>
-        <meta charset="UTF-8">
-        <title>Your Account Details</title>
-        <style>
-            body { font-family: 'Helvetica Neue', Arial, sans-serif; background-color: #f4f6f8; margin: 0; padding: 0; color: #333; }
-            .container { max-width: 600px; margin: 40px auto; background-color: #ffffff; padding: 40px; border-radius: 8px; box-shadow: 0 4px 12px rgba(0,0,0,0.05); }
-            .header { text-align: center; border-bottom: 1px solid #eee; padding-bottom: 20px; margin-bottom: 30px; }
-            .header h1 { font-size: 24px; color: #1a1a1a; margin: 0; }
-            .creds-box { background-color: #f0f7ff; border: 1px solid #dbeafe; border-radius: 8px; padding: 20px; margin: 25px 0; }
-            .creds-item { margin-bottom: 10px; font-size: 16px; }
-            .creds-label { font-weight: bold; color: #555; width: 100px; display: inline-block; }
-            .creds-value { font-family: 'Courier New', Courier, monospace; font-weight: bold; color: #2563eb; }
-            .btn { display: block; width: 200px; margin: 30px auto; background-color: #2563eb; color: #ffffff !important; text-align: center; padding: 12px 0; border-radius: 6px; text-decoration: none; font-weight: bold; }
-            .note { font-size: 13px; color: #666; background-color: #fff4e5; padding: 10px; border-radius: 4px; border-left: 4px solid #f97316; }
-            .footer { font-size: 12px; color: #999; margin-top: 40px; text-align: center; }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <div class="header">
-                <h1>Welcome to {{app_name}}</h1>
-            </div>
-            
-            <p>Dear User,</p>
-            <p>Your account has been successfully set up. Below are your temporary login credentials.</p>
-            
-            <div class="creds-box">
-                <div class="creds-item">
-                    <span class="creds-label">Username:</span>
-                    <span class="creds-value">{{username}}</span>
-                </div>
-                <div class="creds-item">
-                    <span class="creds-label">Password:</span>
-                    <span class="creds-value">{{password}}</span>
-                </div>
-            </div>
-
-            <div class="note">
-                <strong>Important:</strong> For your security, please change your password immediately after logging in.
-            </div>
-
-            <a href="{{login_url}}" class="btn">Log In Now</a>
-            
-            <p style="text-align: center; font-size: 14px;">
-                Or copy this link: <a href="{{login_url}}">{{login_url}}</a>
-            </p>
-
-            <div class="footer">
-                &copy; 2026 {{app_name}}. All rights reserved.<br>
-                If you did not request this account, please contact support.
-            </div>
-        </div>
-    </body>
-    </html>
-    '''
-
-    # 2. 执行数据替换
-    # 使用 payload.get() 提供默认值,防止缺少字段导致报错
-    app_name = str(payload.get('app_name', 'Visafly'))
-    username = str(payload.get('username', ''))
-    password = str(payload.get('password', ''))
-    login_url = str(payload.get('login_url', '#'))
-
-    # 链式替换所有占位符
-    html_content = template.replace('{{app_name}}', app_name) \
-                           .replace('{{username}}', username) \
-                           .replace('{{password}}', password) \
-                           .replace('{{login_url}}', login_url)
-
-    return html_content
-    
-def template_ticket_open(payload):
-    """
-    生成工单创建通知邮件
-    payload 需包含: username, ticket_id, ticket_type, created_at, ticket_url, app_name
-    """
-    
-    # --- 1. 处理时间格式化逻辑 ---
-    raw_time = payload.get('created_at')
-    formatted_time = ""
-
-    if isinstance(raw_time, datetime):
-        # 如果传入的是 datetime 对象
-        formatted_time = raw_time.strftime('%Y-%m-%d %H:%M') + " (UTC)"
-    elif isinstance(raw_time, str):
-        try:
-            # 如果传入的是 ISO 字符串 (例如 '2025-12-31T02:33:00Z')
-            # 截取前19位通常能兼容大部分 ISO 格式
-            dt_obj = datetime.fromisoformat(raw_time.replace('Z', '+00:00'))
-            formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M') + " (UTC)"
-        except ValueError:
-            # 如果解析失败,直接显示原字符串
-            formatted_time = raw_time
-    else:
-        formatted_time = "N/A"
-
-    # --- 2. HTML 模板 ---
-    # 注意:这里保持了 {{key}} 占位符,下面会统一替换
-    template = '''
-    <!DOCTYPE html>
-    <html>
-    <head>
-        <meta charset="UTF-8">
-        <title>Ticket Created</title>
-        <style>
-            body { font-family: 'Helvetica Neue', Arial, sans-serif; background-color: #f4f6f8; margin: 0; padding: 0; color: #333; }
-            .container { max-width: 600px; margin: 40px auto; background-color: #ffffff; padding: 40px; border-radius: 8px; box-shadow: 0 4px 12px rgba(0,0,0,0.05); }
-            .header { border-bottom: 1px solid #eee; padding-bottom: 20px; margin-bottom: 30px; }
-            .header h1 { font-size: 22px; color: #1a1a1a; margin: 0; }
-            .ticket-info { background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px; padding: 20px; margin: 20px 0; }
-            .info-row { margin-bottom: 10px; display: flex; justify-content: space-between; }
-            .info-label { color: #64748b; font-size: 14px; }
-            .info-value { font-weight: bold; color: #0f172a; font-size: 14px; }
-            .btn { display: block; width: 200px; margin: 30px auto; background-color: #2563eb; color: #ffffff !important; text-align: center; padding: 12px 0; border-radius: 6px; text-decoration: none; font-weight: bold; font-size: 14px; }
-            .footer { font-size: 12px; color: #94a3b8; margin-top: 40px; text-align: center; }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <div class="header">
-                <h1>Support Request Received</h1>
-            </div>
-            
-            <p>Hello {{username}},</p>
-            <p>We wanted to let you know that we've received your request. Our team is currently reviewing the details.</p>
-            
-            <div class="ticket-info">
-                <div class="info-row">
-                    <span class="info-label">Ticket ID:</span>
-                    <span class="info-value">#{{ticket_id}}</span>
-                </div>
-                <div class="info-row">
-                    <span class="info-label">Type:</span>
-                    <span class="info-value">{{ticket_type}}</span>
-                </div>
-                <div class="info-row" style="margin-bottom: 0;">
-                    <span class="info-label">Created Time:</span>
-                    <!-- 使用格式化后的时间 -->
-                    <span class="info-value">{{created_at}}</span>
-                </div>
-            </div>
-
-            <p>We usually reply within 24 hours. You will receive an email notification when our agent replies.</p>
-
-            <a href="{{ticket_url}}" class="btn">View Ticket Details</a>
-            
-            <div class="footer">
-                &copy; 2025 {{app_name}}. All rights reserved.<br>
-                Please do not reply to this email directly.
-            </div>
-        </div>
-    </body>
-    </html>
-    '''
-
-    # --- 3. 执行替换 ---
-    # 使用 payload 中的数据替换模板占位符
-    html_content = template.replace('{{username}}', str(payload.get('username', 'User'))) \
-                           .replace('{{ticket_id}}', str(payload.get('ticket_id', ''))) \
-                           .replace('{{ticket_type}}', str(payload.get('ticket_type', ''))) \
-                           .replace('{{created_at}}', formatted_time) \
-                           .replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \
-                           .replace('{{app_name}}', str(payload.get('app_name', 'Visafly')))
-
-    return html_content
-
-def template_appointment_confirmation(payload):
-    """
-    生成预约成功确认邮件 (VisaFly)
-    
-    payload 需包含: 
-    - username: 用户名
-    - order_id: 订单号 (新增)
-    - country: 国家
-    - city: 城市
-    - appointment_date: 预约时间 (字符串, 例如 "2026-03-15 09:00")
-    - visa_type: 签证类型
-    - user_email: 用户邮箱 (用于提示信件已发往此处)
-    """
-    
-    # --- 1. 基础配置 (VisaFly) ---
-    company_name = "VisaFly"
-    support_email = "support@visafly.top"
-    website_home = "https://visafly.top"
-    website_contact = "https://visafly.top/refund-policy"
-
-    # --- 2. HTML 模板 ---
-    template = '''
-    <!DOCTYPE html>
-    <html>
-    <head>
-        <meta charset="UTF-8">
-        <title>Appointment Confirmed</title>
-        <style>
-            body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif; background-color: #f4f6f8; margin: 0; padding: 0; color: #333; }
-            .container { max-width: 600px; margin: 30px auto; background-color: #ffffff; border-radius: 8px; overflow: hidden; box-shadow: 0 4px 12px rgba(0,0,0,0.05); }
-            
-            /* 头部: 品牌蓝 */
-            .header { background-color: #0056b3; padding: 30px 20px; text-align: center; color: white; }
-            .header h1 { margin: 0; font-size: 24px; font-weight: bold; }
-            .header .subtitle { margin-top: 5px; opacity: 0.9; font-size: 14px; }
-            
-            /* 正文区域 */
-            .content { padding: 30px 25px; line-height: 1.6; }
-            
-            /* 核心警告框 (黄色高亮 - 强调查收邮件) */
-            .alert-box { background-color: #fff8e1; border-left: 5px solid #ffc107; padding: 15px 20px; margin: 25px 0; border-radius: 4px; }
-            .alert-title { font-weight: bold; color: #b00020; display: block; margin-bottom: 5px; font-size: 16px; }
-            
-            /* 订单详情卡片 */
-            .details-box { background-color: #f8fafc; border: 1px solid #e2e8f0; border-radius: 6px; padding: 15px; margin-bottom: 25px; }
-            .info-row { display: flex; justify-content: space-between; margin-bottom: 10px; border-bottom: 1px dashed #e2e8f0; padding-bottom: 8px; }
-            .info-row:last-child { border-bottom: none; margin-bottom: 0; padding-bottom: 0; }
-            .label { color: #64748b; font-size: 13px; font-weight: 500; }
-            .value { font-weight: bold; color: #0f172a; font-size: 14px; text-align: right; }
-            
-            /* 底部联系方式 */
-            .help-section { background-color: #f1f5f9; padding: 20px; text-align: center; font-size: 14px; color: #475569; border-top: 1px solid #e2e8f0; }
-            .contact-link { color: #0056b3; font-weight: bold; text-decoration: none; margin: 0 5px; }
-            
-            .footer { text-align: center; padding: 20px; font-size: 12px; color: #94a3b8; }
-            a { color: #0056b3; text-decoration: none; }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <div class="header">
-                <h1>✅ Booking Successful!</h1>
-                <div class="subtitle">Appointment Secured by {{company_name}}</div>
-            </div>
-            
-            <div class="content">
-                <p>Hello <b>{{username}}</b>,</p>
-                <p>Great news! We have successfully secured your appointment slot.</p>
-                
-                <!-- 详情卡片 -->
-                <div class="details-box">
-                    <div class="info-row">
-                        <span class="label">Order ID:</span>
-                        <span class="value">#{{order_id}}</span>
-                    </div>
-                    <div class="info-row">
-                        <span class="label">Country / City:</span>
-                        <span class="value">{{country}} - {{city}}</span>
-                    </div>
-                    <div class="info-row">
-                        <span class="label">Appointment Date:</span>
-                        <span class="value">{{appointment_date}}</span>
-                    </div>
-                    <div class="info-row">
-                        <span class="label">Visa Type:</span>
-                        <span class="value">{{visa_type}}</span>
-                    </div>
-                </div>
-
-                <!-- 核心:检查邮件提示 -->
-                <div class="alert-box">
-                    <span class="alert-title">📩 Important: Check Your Email</span>
-                    We have sent the official confirmation letter to <b>{{user_email}}</b>.<br><br>
-                    <span style="font-size: 13px;">If you don't see it in your Inbox, please check your <b>Spam/Junk</b> folder immediately.</span>
-                </div>
-            </div>
-
-            <!-- 兜底联系方式 -->
-            <div class="help-section">
-                <p style="margin-top: 0; font-weight: bold;">Did not receive the email?</p>
-                <p style="margin-bottom: 15px;">Please check your Spam folder first. If still missing, contact us:</p>
-                
-                <!-- 方式1: 邮件 -->
-                <a href="mailto:{{support_email}}" class="contact-link">✉️ Email Support</a>
-                <span style="color: #cbd5e1;">|</span>
-                <!-- 方式2: 官网 -->
-                <a href="{{website_contact}}" class="contact-link">🌐 Contact Us</a>
-            </div>
-            
-            <div class="footer">
-                &copy; 2026 {{company_name}}. All rights reserved.<br>
-                <a href="{{website_home}}">{{website_home}}</a>
-            </div>
-        </div>
-    </body>
-    </html>
-    '''
-
-    # --- 3. 执行替换 ---
-    html_content = template.replace('{{username}}', str(payload.get('username', 'Customer'))) \
-                           .replace('{{order_id}}', str(payload.get('order_id', 'N/A'))) \
-                           .replace('{{country}}', str(payload.get('country', ''))) \
-                           .replace('{{city}}', str(payload.get('city', ''))) \
-                           .replace('{{appointment_date}}', str(payload.get('appointment_date', 'Confirmed'))) \
-                           .replace('{{user_email}}', str(payload.get('user_email', 'your email'))) \
-                           .replace('{{visa_type}}', str(payload.get('visa_type', 'Standard'))) \
-                           .replace('{{company_name}}', company_name) \
-                           .replace('{{support_email}}', support_email) \
-                           .replace('{{website_contact}}', website_contact) \
-                           .replace('{{website_home}}', website_home)
-
-    return html_content

+ 74 - 0
app/utils/throttler.py

@@ -0,0 +1,74 @@
+# app/utils/throttler.py
+from redis.asyncio import Redis
+from datetime import datetime, timedelta
+from typing import Optional, Any, Tuple
+
+class RedisThrottler:
+    """基于 Redis 的变化感知限流器"""
+    @staticmethod
+    async def should_throttle(
+        redis: Redis, 
+        key: str, 
+        current_signature: str, 
+        expire_seconds: int = 1800
+    ) -> bool:
+        """
+        如果 signature 没变且未过期,则建议限流 (Return True)
+        如果 signature 变了,则更新 Redis 并允许通过 (Return False)
+        """
+        try:
+            last_val = await redis.get(key)
+            if last_val and last_val.decode('utf-8') == current_signature:
+                return True
+            
+            # 记录新状态并设置过期时间
+            await redis.set(key, current_signature, ex=expire_seconds)
+            return False
+        except Exception:
+            # Redis 异常时,默认不限流(保证业务可用性)
+            return False
+
+class BusinessRateLimiter:
+    """基于业务元数据的复杂限流器(冷却时间、每日上限、状态变更检查)"""
+    
+    @staticmethod
+    def check_notification_limit(
+        meta: dict,
+        current_id: str, # 例如 earliest_date 或 status
+        cooldown_hours: int = 8,
+        daily_max: int = 3
+    ) -> Tuple[bool, dict]:
+        """
+        返回: (是否允许发送, 更新后的 meta)
+        """
+        now = datetime.utcnow()
+        today = now.date().isoformat()
+        
+        last_notify_at = meta.get("last_notify_at")
+        last_id = meta.get("last_id")
+        daily_count_date = meta.get("daily_count_date")
+        daily_count = int(meta.get("daily_count") or 0)
+
+        # 1. 内容变化检查: 如果 ID 没变,直接拦截
+        if last_id == current_id:
+            return False, meta
+
+        # 2. 冷却时间检查
+        if last_notify_at:
+            last_dt = datetime.fromisoformat(last_notify_at)
+            if (now - last_dt) < timedelta(hours=cooldown_hours):
+                return False, meta
+
+        # 3. 每日上限检查
+        if daily_count_date == today and daily_count >= daily_max:
+            return False, meta
+
+        # 更新元数据
+        new_meta = meta.copy()
+        new_meta["last_notify_at"] = now.isoformat()
+        new_meta["last_id"] = current_id
+        new_meta["daily_count_date"] = today
+        new_meta["daily_count"] = daily_count + 1 if daily_count_date == today else 1
+        new_meta["notify_count"] = int(new_meta.get("notify_count") or 0) + 1
+        
+        return True, new_meta

+ 241 - 0
scripts/iris.py

@@ -0,0 +1,241 @@
+import asyncio
+import aiohttp
+import logging
+import json
+import traceback
+from datetime import datetime, timedelta
+from typing import Dict, List, Any, Optional
+from notification_templates import *
+
+# ==========================================
+# 1. 配置信息
+# ==========================================
+API_BASE_URL = "http://localhost:8888"
+BEARER_TOKEN = "tok_e946329a60ff45ba807f3f41b0e8b7fc"
+MAX_ATTEMPTS = 5
+
+DEFAULT_TG_TOKEN = "6771183256:AAEd0Tenq4z6hk5toUGrCpEVPfP00bpYT1s"
+DEFAULT_WECHAT_TOKEN = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
+DEFAULT_WA_API_KEY = "51fc877539064f5882fae0f6f0661123"
+DEFAULT_WA_SESSION = "default"
+
+logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
+logger = logging.getLogger("Iris")
+
+# ==========================================
+# 2. Iris 通知调度引擎
+# ==========================================
+class IrisWorker:
+    def __init__(self):
+        self.headers = {
+            "Authorization": f"Bearer {BEARER_TOKEN}",
+            "Accept": "application/json"
+        }
+        self.session: Optional[aiohttp.ClientSession] = None
+        # 核心:频道锁字典,确保同类频道不并发
+        self.channel_locks: Dict[str, asyncio.Lock] = {
+            "email": asyncio.Lock(),
+            "wechat": asyncio.Lock(),
+            "whatsapp": asyncio.Lock(),
+            "telegram": asyncio.Lock(),
+            "tg": asyncio.Lock()
+        }
+
+    async def __aenter__(self):
+        timeout = aiohttp.ClientTimeout(total=45) # 邮件发送可能较慢,稍微延长超时
+        self.session = aiohttp.ClientSession(headers=self.headers, timeout=timeout)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self.session:
+            await self.session.close()
+
+    def _get_next_retry_time(self, attempts: int) -> str:
+        wait_seconds = 2 ** attempts
+        next_time = datetime.utcnow() + timedelta(seconds=wait_seconds)
+        return next_time.isoformat(timespec='milliseconds') + 'Z'
+
+    async def _log_response_error(self, resp: aiohttp.ClientResponse, context: str):
+        try:
+            body = await resp.text()
+            logger.error(f"❌ {context} | Status: {resp.status} | Response: {body}")
+        except Exception as e:
+            logger.error(f"❌ {context} | Status: {resp.status} | (Error reading body: {e})")
+
+    async def fetch_pending_tasks(self) -> List[Dict]:
+        url = f"{API_BASE_URL}/api/notification/outbox/list"
+        params = {"status": "pending", "size": 50}
+        try:
+            async with self.session.get(url, params=params) as resp:
+                if resp.status == 200:
+                    result = await resp.json()
+                    return result["data"]["items"]
+                await self._log_response_error(resp, "Fetch Tasks")
+        except Exception as e:
+            logger.error(f"🔥 Fetch Tasks Exception: {e}")
+        return []
+
+    async def update_task_result(self, task_id: int, success: bool, current_attempts: int):
+        url = f"{API_BASE_URL}/api/notification/outbox/update"
+        params = {"id": task_id}
+        new_attempts = current_attempts + 1
+        
+        if success:
+            data = {"status": "sent", "attempts": new_attempts, "next_retry_at": None}
+        else:
+            if new_attempts >= MAX_ATTEMPTS:
+                data = {"status": "failed", "attempts": new_attempts, "next_retry_at": None}
+            else:
+                next_retry = self._get_next_retry_time(new_attempts)
+                data = {"status": "pending", "attempts": new_attempts, "next_retry_at": next_retry}
+        
+        try:
+            async with self.session.post(url, params=params, json=data) as resp:
+                if resp.status != 200:
+                    await self._log_response_error(resp, f"Update Status (ID: {task_id})")
+        except Exception as e:
+            logger.error(f"🔥 Update Result Exception (ID: {task_id}): {e}")
+
+    # --- 渠道发送适配器 ---
+
+    async def send_email(self, task_payload: Dict) -> bool:
+        try:
+            tid = task_payload.get("template_id")
+            template_data = task_payload.get("payload", {})
+            receivers = task_payload.get("receivers") or ([task_payload.get("receiver")] if task_payload.get("receiver") else [])
+
+            if not receivers:
+                logger.error(f"📧 [Email] No receivers found: {task_payload}")
+                return False
+            
+            body = render_email(tid, template_data)
+            sender, subject = get_email_meta(tid)
+            is_bulk = len(receivers) > 1
+            path = "/api/email-authorizations/sendmail-bulk" if is_bulk else "/api/email-authorizations/sendmail"
+            
+            params = {
+                "emailAccount": sender,
+                "sendTo": ",".join(receivers) if is_bulk else receivers[0],
+                "subject": subject,
+                "contentType": "html"
+            }
+            
+            async with self.session.post(f"{API_BASE_URL}{path}", params=params, json={"body": body}) as resp:
+                if resp.status == 200: return True
+                await self._log_response_error(resp, "Email Dispatch")
+                return False
+        except Exception as e:
+            logger.error(f"🔥 [Email] Exception: {str(e)}")
+            return False
+
+    async def send_wechat(self, task_payload: Dict) -> bool:
+        try:
+            content = render_wechat_markdown(task_payload.get("template_id"), task_payload.get("payload", {}))
+            api_token = task_payload.get("api_token") or DEFAULT_WECHAT_TOKEN
+            async with self.session.post(f"{API_BASE_URL}/api/wechat/send_markdown", 
+                                       json={"api_token": api_token, "message": content}) as resp:
+                if resp.status == 200: return True
+                await self._log_response_error(resp, "WeChat Dispatch")
+                return False
+        except Exception as e:
+            logger.error(f"🔥 [WeChat] Exception: {str(e)}")
+            return False
+
+    async def send_whatsapp(self, task_payload: Dict) -> bool:
+        try:
+            content = render_whatsapp_text(task_payload.get("template_id"), task_payload.get("payload", {}))
+            api_key = task_payload.get("api_key") or DEFAULT_WA_API_KEY
+            recs = task_payload.get("receivers", [])
+            success = True
+            for r in recs:
+                data = {"session": DEFAULT_WA_SESSION, "chat_id": r, "message": content, "api_key": api_key}
+                async with self.session.post(f"{API_BASE_URL}/api/whatsapp/send", json=data) as resp:
+                    if resp.status != 200:
+                        await self._log_response_error(resp, f"WhatsApp to {r}")
+                        success = False
+            return success
+        except Exception as e:
+            logger.error(f"🔥 [WhatsApp] Exception: {str(e)}")
+            return False
+
+    async def send_telegram(self, task_payload: Dict) -> bool:
+        try:
+            data = task_payload.get("payload", {})
+            content = render_telegram_html(task_payload.get("template_id"), data)
+            api_token = task_payload.get("api_token") or DEFAULT_TG_TOKEN
+            chat_id = task_payload.get("chat_id") or data.get("chat_id")
+            if not chat_id: return False
+            async with self.session.post(f"{API_BASE_URL}/api/tg/send_message", 
+                                       json={"chat_id": str(chat_id), "api_token": api_token, "message": content}) as resp:
+                if resp.status == 200: return True
+                await self._log_response_error(resp, "Telegram Dispatch")
+                return False
+        except Exception as e:
+            logger.error(f"🔥 [Telegram] Exception: {str(e)}")
+            return False
+
+    # --- 核心调度逻辑 ---
+
+    async def process_task(self, item: Dict):
+        # 1. 指数退避时间检查
+        next_retry = item.get("next_retry_at")
+        if next_retry:
+            try:
+                dt_str = next_retry.replace('Z', '+00:00')
+                if datetime.fromisoformat(dt_str).replace(tzinfo=None) > datetime.utcnow():
+                    return
+            except Exception: pass
+
+        task_id = item["id"]
+        channel = (item.get("channel") or "").lower()
+        task_payload = item.get("payload", {}) 
+        attempts = item.get("attempts", 0)
+
+        # 2. 获取该频道的锁 (如果没有定义则动态创建一个)
+        if channel not in self.channel_locks:
+            self.channel_locks[channel] = asyncio.Lock()
+        
+        # 3. 在锁保护下执行发送逻辑(同频道串行,跨频道并行)
+        async with self.channel_locks[channel]:
+            logger.info(f"🚚 [Task {task_id}] Delivering {channel.upper()} (Attempt: {attempts})")
+            
+            success = False
+            if channel == "email": success = await self.send_email(task_payload)
+            elif channel == "wechat": success = await self.send_wechat(task_payload)
+            elif channel == "whatsapp": success = await self.send_whatsapp(task_payload)
+            elif channel in ["telegram", "tg"]: success = await self.send_telegram(task_payload)
+            else:
+                logger.warning(f"❓ [Task {task_id}] Unknown channel: {channel}")
+                return
+
+            await self.update_task_result(task_id, success, attempts)
+            
+            if success:
+                logger.info(f"✨ [Task {task_id}] Sent Successfully.")
+            
+            # 可以在此处增加一个小间隔,避免请求过于密集
+            await asyncio.sleep(0.5)
+
+    async def run_loop(self):
+        logger.info("🌈 Iris Messenger Engine is running with Channel-Locking...")
+        while True:
+            tasks = await self.fetch_pending_tasks()
+            if tasks:
+                # 依然使用 gather,但内部受 Lock 控制,达到“同类串行、异类并行”
+                await asyncio.gather(*(self.process_task(t) for t in tasks))
+            await asyncio.sleep(2)
+
+# ==========================================
+# 4. 运行入口
+# ==========================================
+async def main():
+    async with IrisWorker() as worker:
+        await worker.run_loop()
+
+if __name__ == "__main__":
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        logger.info("Iris stopped by user.")
+    except Exception:
+        logger.critical(f"Iris Engine Crashed: {traceback.format_exc()}")

+ 356 - 0
scripts/notification_templates.py

@@ -0,0 +1,356 @@
+from typing import Dict, Tuple, Any, List
+
+# ==========================================
+# 0. Internal Helpers
+# ==========================================
+
+def _format_slot_summary(availability: Any) -> str:
+    if not isinstance(availability, list) or not availability:
+        return "No specific time slots recorded."
+    items = []
+    for day in availability[:4]:
+        date = day.get("date", "")
+        # Minimalist text for anti-spam
+        items.append(f"- {date}")
+    return "\n".join(items)
+
+def _format_currency(amount_in_cents: Any) -> str:
+    """将分转换为元,保留两位小数"""
+    try:
+        # 确保是数字类型
+        value = float(amount_in_cents) / 100.0
+        return f"{value:.2f}"
+    except (ValueError, TypeError):
+        return str(amount_in_cents)
+
+def _get_status_meta(status: str) -> Tuple[str, str, str]:
+    if status == 'Available':
+        return "READY", "#16a34a", "Available"
+    if status == 'Waitlist':
+        return "WAIT", "#ca8a04", "Waitlist Only"
+    return "NONE", "#dc2626", "No Slots"
+
+# ==========================================
+# 1. EMAIL TEMPLATES (Anti-Spam Optimized)
+# ==========================================
+
+def _email_base(content_html: str, app_name: str = "Visafly") -> str:
+    """
+    Ultra-clean HTML structure. 
+    Uses standard fonts and minimal CSS to pass through ESP filters.
+    """
+    return f"""
+    <!DOCTYPE html>
+    <html lang="en">
+    <head>
+        <meta charset="UTF-8">
+        <title>Notification</title>
+        <style>
+            /* Standard fonts only */
+            body {{ font-family: Arial, sans-serif; -webkit-font-smoothing: antialiased; font-size: 16px; line-height: 1.5; color: #333333; margin: 0; padding: 0; }}
+            .wrapper {{ background-color: #f6f9fc; padding: 20px; }}
+            .container {{ max-width: 600px; margin: 0 auto; background-color: #ffffff; border: 1px solid #e1e4e8; border-radius: 4px; }}
+            .content {{ padding: 30px; }}
+            .footer {{ padding: 20px; text-align: center; font-size: 12px; color: #777777; }}
+            .button {{ display: inline-block; padding: 10px 20px; background-color: #0052cc; color: #ffffff !important; text-decoration: none; border-radius: 3px; }}
+            .code {{ font-family: monospace; font-size: 24px; font-weight: bold; background: #f4f4f4; padding: 10px; border-radius: 4px; display: inline-block; letter-spacing: 2px; }}
+            hr {{ border: 0; border-top: 1px solid #eeeeee; margin: 20px 0; }}
+        </style>
+    </head>
+    <body>
+        <div class="wrapper">
+            <div class="container">
+                <div class="content">
+                    {content_html}
+                    <hr>
+                    <p style="font-size: 13px; color: #888;">
+                        Regards,<br>
+                        <strong>{app_name} Team</strong>
+                    </p>
+                </div>
+            </div>
+            <div class="footer">
+                You received this because you are a registered user of {app_name}.<br>
+                To manage your notification settings, please visit our website.<br>
+                © 2026 {app_name}. Support: contact@visafly.top
+            </div>
+        </div>
+    </body>
+    </html>
+    """
+
+def template_email_verification_bind(payload: Dict) -> str:
+    # Use neutral language: "Verification code" instead of "URGENT ACTION"
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Email Verification</h2>
+        <p>You requested to link this email address to your {payload.get('app_name', 'Visafly')} account.</p>
+        <p>Your verification code is:</p>
+        <div class="code">{payload.get('code')}</div>
+        <p>This code will expire in {payload.get('expiration_time', '10 minutes')}.</p>
+        <p>If you did not make this request, you can safely ignore this email.</p>
+    """
+    return _email_base(html, payload.get('app_name', 'Visafly'))
+
+def template_email_password_reset(payload: Dict) -> str:
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Password Reset Code</h2>
+        <p>We received a request to reset your password. Please enter the following code to continue:</p>
+        <div class="code">{payload.get('code')}</div>
+        <p>Valid for {payload.get('expiration_time', '10 minutes')}. For security reasons, do not share this code.</p>
+    """
+    return _email_base(html, payload.get('app_name', 'Visafly'))
+
+def template_email_login_credentials(payload: Dict) -> str:
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Your Account Credentials</h2>
+        <p>Your account has been successfully created. Here are your temporary login details:</p>
+        <table style="width: 100%; background: #f9f9f9; padding: 15px; border-radius: 4px;">
+            <tr><td><strong>User:</strong></td><td>{payload.get('username')}</td></tr>
+            <tr><td><strong>Pass:</strong></td><td><code>{payload.get('password')}</code></td></tr>
+        </table>
+        <p><a href="{payload.get('login_url')}" class="button">Log in to your account</a></p>
+    """
+    return _email_base(html, payload.get('app_name', 'Visafly'))
+
+def template_email_slot_subscription(payload: Dict) -> str:
+    # Change "Alert" to "Update" to sound less like spam
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Visa Slot Update</h2>
+        <p>This is an automated update regarding your visa slot subscription.</p>
+        <div style="border-left: 3px solid #0052cc; padding-left: 15px; margin: 20px 0;">
+            <strong>Location:</strong> {payload.get('country')}, {payload.get('city')}<br>
+            <strong>Visa Type:</strong> {payload.get('visa_type')}<br>
+            <strong>Earliest Date:</strong> <span style="color:#d93025;">{payload.get('earliest_date')}</span>
+        </div>
+        <p><a href="{payload.get('website')}" class="button">Check Official Website</a></p>
+    """
+    return _email_base(html)
+
+def template_email_appointment_confirmation(payload: Dict) -> str:
+    """预约成功确认邮件 (Anti-Spam Optimized)"""
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Appointment Confirmed</h2>
+        <p>Dear {payload.get('username', 'Customer')},</p>
+        <p>We are pleased to inform you that your visa appointment has been successfully booked.</p>
+        
+        <table style="width: 100%; background: #f9f9f9; padding: 15px; border-radius: 4px; border-collapse: separate; border-spacing: 0 10px; text-align: left;">
+            <tr>
+                <td style="width: 35%; color: #555;"><strong>Order ID:</strong></td>
+                <td><code>{payload.get('order_id', 'N/A')}</code></td>
+            </tr>
+            <tr>
+                <td style="color: #555;"><strong>Location:</strong></td>
+                <td>{payload.get('country', '')}, {payload.get('city', '')}</td>
+            </tr>
+            <tr>
+                <td style="color: #555;"><strong>Visa Type:</strong></td>
+                <td>{payload.get('visa_type', 'N/A')}</td>
+            </tr>
+            <tr>
+                <td style="color: #555;"><strong>Appointment Date:</strong></td>
+                <td><strong style="color: #0052cc;">{payload.get('appointment_date', 'N/A')}</strong></td>
+            </tr>
+            <tr>
+                <td style="color: #555;"><strong>Account Email:</strong></td>
+                <td>{payload.get('user_email', 'N/A')}</td>
+            </tr>
+        </table>
+        
+        <p>Please ensure you prepare all required documents well in advance of your appointment date. If you have any questions, feel free to contact our support team.</p>
+    """
+    return _email_base(html, payload.get('app_name', 'Visafly'))
+
+def template_email_order_event_update(payload: Dict) -> str:
+    """
+    Template for visa progress updates. 
+    Focuses on neutral language to ensure inbox delivery.
+    """
+    order_no = payload.get('order_no', 'N/A')
+    summary = payload.get('summary', 'No summary available.')
+    # Neutral subject line used in get_email_meta: "Notification: Visa Appointment Update"
+    
+    html = f"""
+        <h2 style="font-size: 20px; color: #111;">Visa Appointment Update</h2>
+        <p>We have received a new update regarding your visa process for <strong>Order #{order_no}</strong>.</p>
+        
+        <div style="border-left: 3px solid #0052cc; padding: 15px; margin: 20px 0; background-color: #f9fafb; border-radius: 0 4px 4px 0;">
+            <strong style="color: #555; font-size: 13px; text-transform: uppercase;">Update Summary:</strong><br>
+            <p style="margin-top: 5px; color: #333;">{summary}</p>
+        </div>
+        
+        <p>For your security and privacy, please log in to your dashboard to view the full message and any relevant documents.</p>
+        
+        <p style="margin-top: 30px;">
+            <a href="https://visafly.top/dashboard" class="button">Access Dashboard</a>
+        </p>
+    """
+    return _email_base(html, "Visafly")
+
+# ==========================================
+# 2. WECHAT / TELEGRAM / WHATSAPP (Keeping the styles)
+# ==========================================
+# [Note: These channels don't have spam filters based on HTML structure, 
+# so we keep the rich formatting we designed previously.]
+
+def template_wechat_payment_confirmed(payload: Dict) -> str:
+    """支付成功通知模板"""
+    amount_decimal = _format_currency(payload.get('amount'))
+    return (
+        f"# ✅ Payment Success\n"
+        f"--- \n"
+        f"💰 **Amount:** {amount_decimal} {payload.get('currency')}\n"
+        f"🆔 **Order ID:** `{payload.get('order_id')}`\n"
+        f"📧 **User Account:** {payload.get('user_email')}\n"
+        f"🕒 **Time:** {payload.get('confirmed_at')}\n\n"
+        f"Your service has been activated. Please stay tuned for further updates."
+    )
+    
+def template_wechat_appointment_confirmation(payload: Dict) -> str:
+    """预约成功微信通知 (Markdown)"""
+    return (
+        f"# 🎉 Appointment Confirmed\n"
+        f"--- \n"
+        f"👤 **User:** {payload.get('username', 'Customer')}\n"
+        f"📍 **Location:** {payload.get('country')}, {payload.get('city')}\n"
+        f"🛂 **Visa Type:** {payload.get('visa_type')}\n"
+        f"📅 **Date:** `{payload.get('appointment_date')}`\n"
+        f"🆔 **Order ID:** `{payload.get('order_id')}`\n\n"
+        f"Your appointment is successfully booked. Please prepare your documents."
+    )
+
+def template_wechat_slot_snapshot(payload: Dict) -> str:
+    emoji, _, status_text = _get_status_meta(payload.get("availability_status"))
+    summary = _format_slot_summary(payload.get("availability"))
+    return (
+        f"# {emoji} {payload.get('country')} {payload.get('city')}\n"
+        f"> {status_text} | {payload.get('visa_type')}\n\n"
+        f"📅 **Earliest Date:** `{payload.get('earliest_date', 'N/A')}`\n\n"
+        f"🔍 **Slots Overview:**\n{summary}\n\n"
+        f"🔗 [Book Now]({payload.get('website', '#')})"
+    )
+
+def template_telegram_slot_snapshot(payload: Dict) -> str:
+    emoji, _, status_text = _get_status_meta(payload.get("availability_status"))
+    summary = _format_slot_summary(payload.get("availability"))
+    return (
+        f"{emoji} <b>{payload.get('country')}, {payload.get('city')}</b>\n"
+        f"<i>{status_text} · {payload.get('visa_type')}</i>\n"
+        f"──────────────────\n"
+        f"📅 <b>Earliest Date:</b> <code>{payload.get('earliest_date', 'N/A')}</code>\n\n"
+        f"📊 <b>Availability:</b>\n{summary}\n"
+        f"──────────────────\n"
+        f"🔗 <a href='{payload.get('website', '#')}'><b>Official Website ➜</b></a>\n\n"
+    )
+
+def template_telegram_appointment_confirmation(payload: Dict) -> str:
+    """预约成功 TG 通知 (HTML)"""
+    return (
+        f"🎉 <b>Appointment Confirmed</b>\n"
+        f"──────────────────\n"
+        f"👤 <b>Name:</b> {payload.get('username', 'Customer')}\n"
+        f"📍 <b>Location:</b> {payload.get('country')}, {payload.get('city')}\n"
+        f"🛂 <b>Visa Type:</b> {payload.get('visa_type')}\n"
+        f"📅 <b>Date:</b> <code>{payload.get('appointment_date')}</code>\n"
+        f"──────────────────\n"
+        f"🆔 <i>Order: {payload.get('order_id')}</i>\n"
+        f"📧 <i>Email: {payload.get('user_email')}</i>"
+    )
+    
+def template_telegram_order_event_update(payload: Dict) -> str:
+    summary = payload.get('summary', 'New status update received.')
+    return (
+        f"🔔 <b>Visa Progress Update</b>\n"
+        f"──────────────────\n"
+        f"🆔 <b>Order:</b> #{payload.get('order_no')}\n"
+        f"📊 <b>Summary:</b>\n<i>{summary}</i>\n"
+        f"──────────────────\n"
+        f"🔗 <a href='https://visafly.top/dashboard'><b>View Dashboard ➜</b></a>"
+    )
+
+def template_whatsapp_slot_snapshot(payload: Dict) -> str:
+    emoji, _, _ = _get_status_meta(payload.get("availability_status"))
+    return (
+        f"{emoji} *VISA SLOT UPDATE*\n\n"
+        f"📍 *Location:* {payload.get('country')}, {payload.get('city')}\n"
+        f"🛂 *Type:* {payload.get('visa_type')}\n"
+        f"📅 *Earliest:* {payload.get('earliest_date')}\n\n"
+        f"🌐 *Book Now:* {payload.get('website')}"
+    )
+    
+def template_whatsapp_appointment_confirmation(payload: Dict) -> str:
+    """预约成功 WhatsApp 通知 (Clean Text)"""
+    return (
+        f"✅ *APPOINTMENT CONFIRMED*\n\n"
+        f"Dear {payload.get('username', 'Customer')},\n"
+        f"Your visa appointment has been successfully booked.\n\n"
+        f"📍 *Location:* {payload.get('country')}, {payload.get('city')}\n"
+        f"🛂 *Visa Type:* {payload.get('visa_type')}\n"
+        f"📅 *Date:* {payload.get('appointment_date')}\n"
+        f"🆔 *Order ID:* {payload.get('order_id')}\n\n"
+        f"Please ensure all required documents are prepared. Thank you for choosing Visafly."
+    )
+    
+def template_whatsapp_order_event_update(payload: Dict) -> str:
+    """
+    WhatsApp template for progress updates.
+    """
+    order_no = payload.get('order_no', 'N/A')
+    # If the service pre-generated a message, use it, otherwise build one
+    message = payload.get('message')
+    if not message:
+        summary = payload.get('summary', 'New status update received.')
+        message = (
+            f"🔔 *VISA PROGRESS UPDATE*\n\n"
+            f"Order: #{order_no}\n\n"
+            f"Latest Update:\n"
+            f"_{summary}_\n\n"
+            f"Please log in to your dashboard or check your email for full details."
+        )
+    return message
+
+# ==========================================
+# Dispatchers (保持不变)
+# ==========================================
+
+EMAIL_MAP = {
+    "email_verification_for_bind": (template_email_verification_bind, "Verification code for your account"),
+    "email_verification_for_reset": (template_email_password_reset, "Password reset code"),
+    "login_credentials": (template_email_login_credentials, "Your login credentials"),
+    "slot_subscription": (template_email_slot_subscription, "Update: Visa availability change"),
+    "appointment_confirmation": (template_email_appointment_confirmation, "Your Visa Appointment Confirmation"),
+    "order_event_update": (template_email_order_event_update, "Notification: Visa Appointment Update"),
+}
+
+WECHAT_MAP = {
+    "slot_snapshot": template_wechat_slot_snapshot,
+    "payment_user_confirmed": template_wechat_payment_confirmed,
+    "appointment_confirmation": template_wechat_appointment_confirmation,
+}
+
+WHATSAPP_MAP = {
+    "slot_snapshot": template_whatsapp_slot_snapshot,
+    "appointment_confirmation": template_whatsapp_appointment_confirmation,
+    "order_event_update": template_whatsapp_order_event_update,
+}
+
+TELEGRAM_MAP = {
+    "slot_snapshot": template_telegram_slot_snapshot,
+    "appointment_confirmation": template_telegram_appointment_confirmation,
+    "order_event_update": template_telegram_order_event_update,
+}
+
+def render_email(tid: str, payload: Dict) -> str:
+    return EMAIL_MAP[tid][0](payload) if tid in EMAIL_MAP else "Notification from Visafly"
+
+def get_email_meta(tid: str) -> Tuple[str, str]:
+    sender = "donotreply@visafly.top"
+    return (sender, EMAIL_MAP[tid][1]) if tid in EMAIL_MAP else (sender, "Notification")
+
+def render_wechat_markdown(tid: str, payload: Dict) -> str:
+    return WECHAT_MAP[tid](payload) if tid in WECHAT_MAP else str(payload)
+
+def render_whatsapp_text(tid: str, payload: Dict) -> str:
+    return WHATSAPP_MAP[tid](payload) if tid in WHATSAPP_MAP else str(payload)
+
+def render_telegram_html(tid: str, payload: Dict) -> str:
+    return TELEGRAM_MAP[tid](payload) if tid in TELEGRAM_MAP else str(payload)