jerry 4 mesiacov pred
rodič
commit
35d1ba1661

+ 67 - 8
app/api/router.py

@@ -36,7 +36,7 @@ from app.schemas.product import VasProductCreate, VasProductUpdate, VasProductOu
 from app.schemas.product_routing import VasProductRoutingCreate, VasProductRoutingOut
 from app.schemas.schema import VasSchemaCreate, VasSchemaUpdate, VasSchemaOut
 from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs, VasOrderOut
-from app.schemas.payment import VasPaymentCreate, VasPaymentOut
+from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload, VasPaymentOut
 from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate, VasPaymentConfirmationOut
 from app.schemas.payment_qr import VasPaymentQrCreate, VasPaymentQrSetEnableIn, VasPaymentQrOut
 from app.schemas.payment_provider import VasPaymentProviderCreate, VasPaymentProviderUpdate, VasPaymentProviderOut
@@ -48,6 +48,7 @@ from app.schemas.telegram import TelegramIn
 from app.schemas.wechat import WechatIn
 from app.schemas.resource import FileUploadOut
 from app.schemas.statistics import VasStatisticsOverviewOut
+from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
 from app.services.configuration_service import ConfigurationService
 from app.services.troov_service import get_rate_by_date
 from app.services.sms_service import save_short_message, query_short_message
@@ -75,6 +76,8 @@ from app.services.telegram_service import TelegramService
 from app.services.wechat_service import WechatService
 from app.services.slot_snapshot_service import SlotSnapshotService
 from app.services.statistics_service import StatisticsService
+from app.services.queue_service import QueueService
+from app.services.llm_service import LlmService
 
 
 # 公共路由
@@ -384,6 +387,22 @@ async def task_update_by_id(task_id: int, data: TaskUpdate, db: Session = Depend
     updated = await TaskService.update(db, task_id, data)
     return success(data=updated)
 
+@admin_required_router.get("/task/pop", summary="任务出队(pop)", tags=["任务管理接口"], response_model=ApiResponse[TaskOut])
+async def task_pop_task(
+    queue_name: str,
+    db: Session = Depends(get_db),
+):
+    task = await QueueService.pop_task(db, queue_name)
+    return success(data=task)
+    
+@admin_required_router.get("/queues", summary="调试队列数据", tags=["测试接口"], response_model=ApiResponse)
+async def dump_all_queues():
+    """
+    调试:查看所有内存队列
+    """
+    data = await QueueService.dump_all()
+    return success(data=data)
+
 @admin_required_router.post("/tg/send_message", summary="推送电报消息", tags=["消息推送接口"], response_model=ApiResponse)
 async def tg_send_message(
     payload: TelegramIn
@@ -655,6 +674,14 @@ async def vas_product_routing_list_by_product(
     product_routings = await ProductRoutingService.list_by_product(db, product_id)
     return success(data=product_routings)
 
+@admin_required_router.post("/vas/llm/data_parsing", summary="llm数据解析", tags=["Visafly签证系统"], response_model=ApiResponse[ParseUserInputsOut])
+async def vas_llm_data_parsing(
+    payload: ParseUserInputsPayload,
+    db: Session = Depends(get_db)
+):
+    out = await LlmService.handle_parse(db, payload)
+    return success(data=out)
+
 @public_router.get("/vas/schema/detail", summary="获取schema", tags=["Visafly签证系统"], response_model=ApiResponse[VasSchemaOut])
 async def vas_schema_get(
     schema_id: int,
@@ -711,9 +738,24 @@ async def vas_order_create(
         user_inputs=payload.user_inputs,
     )
     created_order = await OrderService.create(db, payload, product, current_user, redis_client)
-    if current_user.role == "admin":
-        await OrderService.mark_as_admin_paid(db, created_order, current_user)
-        await OrderService.create_tasks_for_order(db, created_order)
+    return success(data=created_order)
+
+@admin_required_router.post("/vas/order/create_by_admin", summary="管理员创建订单", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderOut])
+async def vas_order_create_by_admin(
+    payload: VasOrderCreate,
+    current_user: VasUser = Depends(get_current_user),
+    db: Session = Depends(get_db),
+    redis_client: Redis = Depends(get_redis_client)
+):
+    product = await ProductService.get(db, payload.product_id)
+    # ① 获取产品绑定的 schema
+    schema = await SchemaService.get(db, product.schema_id)
+    # ② 校验 user_inputs
+    validate_user_inputs(
+        schema_json=schema.schema_json,
+        user_inputs=payload.user_inputs,
+    )
+    created_order = await OrderService.create_by_admin(db, payload, product, current_user, redis_client)
     return success(data=created_order)
 
 @admin_required_router.get("/vas/order/detail", summary="查询订单", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderOut])
@@ -824,14 +866,23 @@ async def vas_payment_create(
     res = await PaymentService.get_by_id(db, payment_id)
     return success(data=res)
 
-@admin_required_router.get("/vas/payment/confirm_by_admin", summary="管理员确认支付", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentConfirmationOut])
+@admin_required_router.post("/vas/payment/confirm_by_admin", summary="管理员确认支付", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentConfirmationOut])
 async def vas_payment_confirm_by_admin(
     id: int,
     payload: VasPaymentConfirmationUpdate,
     current_user: VasUser = Depends(get_current_user),
     db: Session = Depends(get_db)
 ):
-    res = await PaymentService.confirm_by_admin(db, payload, current_user)
+    res = await PaymentService.confirm_by_admin(db, id, payload, current_user)
+    return success(data=res)
+
+@admin_required_router.post("/vas/payment/admin_update_status", summary="管理员更新支付状态", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentOut])
+async def vas_payment_admin_update_status(
+    payment_id: int,
+    payload: AdminUpdateStatusPayload,
+    db: Session = Depends(get_db)
+):
+    res = await PaymentService.admin_update_status(db, payment_id, payload)
     return success(data=res)
 
 @protected_router.post("/vas/payment/confirm_by_user", summary="用户确认支付", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentConfirmationOut])
@@ -864,7 +915,7 @@ async def vas_payment_list_by_order(
 
 @admin_required_router.post("/vas/payment_qr/create", summary="新增收款码", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentQrOut])
 async def vas_payment_qr_create(payload: VasPaymentQrCreate, db: Session = Depends(get_db)):
-    qr = PaymentQrService.create(db, payload)
+    qr = await PaymentQrService.create(db, payload)
     return success(data=qr)
 
 @protected_router.get("/vas/payment_qr/list_by_provider", summary="获取某个服务商的所有付款码", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasPaymentQrOut]])
@@ -925,7 +976,7 @@ async def vas_task_pending(
     tasks = await VasTaskService.get_active_task_by_order_id(db, order_id)
     return success(data=tasks)
 
-@admin_required_router.post("/vas/task/return_to_queue", summary="重新放回队列", tags=["Visafly签证系统"], response_model=ApiResponse[VasTaskOut])
+@admin_required_router.post("/vas/task/return_to_queue", summary="重新进入等候状态", tags=["Visafly签证系统"], response_model=ApiResponse[VasTaskOut])
 async def vas_task_return_to_queue(task_id:int, db: Session = Depends(get_db)):
     obj = await VasTaskService.return_to_queue(db, task_id)
     return success(data=obj)
@@ -935,6 +986,14 @@ async def vas_task_manual_confirm(task_id:int, db: Session = Depends(get_db)):
     obj = await VasTaskService.manual_confirm(db, task_id)
     return success(data=obj)
 
+@admin_required_router.get("/vas/task/pop", summary="任务出队(pop)", tags=["Visafly签证系统"], response_model=ApiResponse[VasTaskOut])
+async def vas_task_pop_task(
+    queue_name: str,
+    db: Session = Depends(get_db),
+):
+    task = await QueueService.pop_vas_task(db, queue_name)
+    return success(data=task)
+
 @protected_router.post("/vas/ticket/create", summary="创建工单", tags=["Visafly签证系统"], response_model=ApiResponse[VasTicketOut])
 async def vas_ticket_create(
     data:VasTicketCreate,

+ 3 - 3
app/core/auth.py

@@ -4,7 +4,7 @@ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 from sqlalchemy.ext.asyncio import AsyncSession
 
 from app.core.database import get_db
-from app.core.biz_exception import PermissionDeniedError
+from app.core.biz_exception import PermissionDeniedError, SessionExpiredOrInvalid
 from app.services.session_service import SessionService
 
 security = HTTPBearer(auto_error=False)
@@ -26,13 +26,13 @@ async def get_current_user(
     db: AsyncSession = Depends(get_db),
 ):
     if not credentials:
-        raise PermissionDeniedError("Missing token")
+        raise SessionExpiredOrInvalid("Missing token")
 
     token = credentials.credentials
     user = await SessionService().get_user_by_token(db, token)
 
     if not user:
-        raise PermissionDeniedError("Invalid or expired token")
+        raise SessionExpiredOrInvalid("Invalid or expired token")
 
     return user
 

+ 4 - 0
app/core/biz_exception.py

@@ -21,6 +21,10 @@ class NotFoundError(BizException):
 class PermissionDeniedError(BizException):
     def __init__(self, message="Permission denied"):
         super().__init__(code=40301, message=message, http_status=403)
+        
+class SessionExpiredOrInvalid(BizException):
+    def __init__(self, message="Session expired or invalid"):
+        super().__init__(code=40101, message=message, http_status=401)
 
 class BizLogicError(BizException):
     def __init__(self, message="Business logic error"):

+ 5 - 0
app/core/queue_manager.py

@@ -0,0 +1,5 @@
+# app/core/queue_manager.py
+
+from app.utils.priority_queue_utils import NamedQueueManager
+
+queue_manager = NamedQueueManager()

+ 57 - 14
app/main.py

@@ -4,9 +4,12 @@ from fastapi import FastAPI, Depends, Request
 from fastapi.responses import JSONResponse
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.openapi.utils import get_openapi
+from fastapi.exceptions import RequestValidationError
+from starlette.exceptions import HTTPException as StarletteHTTPException
 
 from app.api import router
 from app.core.redis import get_redis_client
+from app.core.database import AsyncSessionLocal
 from app.core.auth import RoleLevel, require_min_role
 from app.core.config import settings
 from app.core.payment import init_stripe
@@ -22,23 +25,14 @@ app = FastAPI(title=settings.app_name)
 # -----------------------
 @app.on_event("startup")
 async def startup():
-    # 如果 init_stripe 是 async
+    # 支付配置
     init_stripe()
+    logger.info("🟢 Stripe config done")
     
- 
-# 全局 Redis 客户端
-
-
-@app.on_event("startup")
-async def startup_event():
-    """
-    FastAPI 启动时执行
-    """
-    # 启动后台消费任务
+    # 通知服务启动
     redis_client = await get_redis_client()
-    asyncio.create_task(notification_consumer(redis_client))
-    print("🟢 Notification consumer started")   
-
+    asyncio.create_task(notification_consumer(AsyncSessionLocal, redis_client))
+    logger.info("🟢 Notification consumer started")    
 
 # -----------------------
 # Exception Handlers
@@ -68,6 +62,55 @@ async def unhandled_exception_handler(request: Request, exc: Exception):
         },
     )
 
+@app.exception_handler(RequestValidationError)
+async def validation_exception_handler(request: Request, exc: RequestValidationError):
+    """
+    接管 FastAPI 默认的 422 数据校验错误
+    """
+    # 1. 获取所有的错误详情
+    errors = exc.errors()
+    
+    # 2. 提取第一个错误作为主要提示信息(通常用户只关心第一个报错)
+    if errors:
+        first_error = errors[0]
+        # 处理路径:去掉 'body', 'query' 等前缀,只保留字段名,用点连接
+        # 例如: ['body', 'user', 'age'] -> 'user.age'
+        # 如果路径只有 ['body'] (比如传了空JSON),则通过 [1:] 过滤可能为空,要做个保护
+        raw_path = first_error.get("loc", [])
+        path_parts = [str(p) for p in raw_path if p not in ('body', 'query', 'path')]
+        field_path = ".".join(path_parts) if path_parts else "request_body"
+        
+        err_msg = first_error.get("msg")
+        
+        # 优化提示文案
+        message = f"Invalid parameter [{field_path}]: {err_msg}"
+    else:
+        message = "Invalid request parameters"
+
+    # 3. 返回标准格式
+    return JSONResponse(
+        status_code=422,
+        content={
+            "code": 42200,            # 或者是你们约定的参数错误码
+            "message": message,       # 转换后的人类可读提示
+            "data": errors            # 保留原始错误详情,方便前端开发排查
+        },
+    )
+    
+@app.exception_handler(StarletteHTTPException)
+async def http_exception_handler(request: Request, exc: StarletteHTTPException):
+    """
+    接管所有 HTTP 错误,包括 404 Not Found 和 405 Method Not Allowed
+    """
+    return JSONResponse(
+        status_code=exc.status_code,
+        content={
+            # 这里你可以根据 status_code 生成你的业务 code,比如 404 -> 40400
+            "code": exc.status_code*100, 
+            "message": exc.detail,  # 这里通常是 "Not Found"
+            "data": None,
+        },
+    )
 
 # -----------------------
 # CORS

+ 3 - 2
app/schemas/card.py

@@ -24,5 +24,6 @@ class CardOut(CardBase):
     created_at: datetime
     updated_at: datetime
 
-    class Config:
-        orm_mode = True
+    model_config = {
+        "from_attributes": True
+    }

+ 3 - 2
app/schemas/configuration.py

@@ -26,5 +26,6 @@ class ConfigurationOut(ConfigurationBase):
     created_at: datetime
     updated_at: datetime
 
-    class Config:
-        orm_mode = True
+    model_config = {
+        "from_attributes": True
+    }

+ 3 - 2
app/schemas/email_authorizations.py

@@ -34,5 +34,6 @@ class EmailAuthorizationOut(EmailAuthorizationBase):
     created_at: datetime
     updated_at: datetime
 
-    class Config:
-        orm_mode = True
+    model_config = {
+        "from_attributes": True
+    }

+ 3 - 2
app/schemas/http_session.py

@@ -19,5 +19,6 @@ class HttpSessionOut(HttpSessionBase):
     session_id: str
     create_at: datetime
 
-    class Config:
-        orm_mode = True
+    model_config = {
+        "from_attributes": True
+    }

+ 12 - 0
app/schemas/llm.py

@@ -0,0 +1,12 @@
+from pydantic import BaseModel
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+
+class ParseUserInputsPayload(BaseModel):
+    input_raw_str: str
+    schema_id: int
+    
+class ParseUserInputsOut(BaseModel):
+    parsed_obj: Optional[Dict[str, Any]]
+

+ 7 - 2
app/schemas/payment.py

@@ -27,6 +27,10 @@ class VasPaymentBase(BaseModel):
 class VasPaymentCreate(BaseModel):
     order_id: str
     provider: Literal['stripe', 'wechat', 'alipay']
+    
+class AdminUpdateStatusPayload(BaseModel):
+    remark: Optional[str] = None
+    status: Literal['succeeded', 'failed']
 
 class VasPaymentOut(VasPaymentBase):
     id: int
@@ -53,5 +57,6 @@ class VasPaymentOut(VasPaymentBase):
     created_at: datetime
     updated_at: datetime
 
-    class Config:
-        orm_mode = True
+    model_config = {
+        "from_attributes": True
+    }

+ 41 - 19
app/services/auth_service.py

@@ -39,6 +39,23 @@ def _random_password(length: int = 16) -> str:
     )
 
 
+ADJECTIVES = [
+    "Silent", "Lucky", "Happy", "Brave", "Quick", "Bright",
+    "Quiet", "Crazy", "Cool", "Swift", "Blue", "Red", "Golden"
+]
+
+NOUNS = [
+    "Fox", "Wolf", "Tiger", "Eagle", "Lion", "Bear",
+    "River", "Cloud", "Storm", "Nova", "Shadow"
+]
+
+def human_like_nickname(max_number: int = 999) -> str:
+    adj = random.choice(ADJECTIVES)
+    noun = random.choice(NOUNS)
+    number = random.randint(1, max_number)
+
+    return f"{adj}{noun}{number}"
+
 class AuthService:
     # =========================
     # 自动注册(游客)
@@ -55,7 +72,7 @@ class AuthService:
         user = VasUser(
             id=uid,
             role="user",
-            nickname="anonymous visitor",
+            nickname=human_like_nickname(),
             preferred_language="en",
             timezone="Asia/Shanghai",
             register_ip=ip or '',
@@ -88,21 +105,23 @@ class AuthService:
         redis_client: Redis,
     ):
         token = uuid.uuid4().hex[:6]
-
+        expiration_time = datetime.utcnow() + timedelta(minutes=10)
         record = VasVerificationToken(
             token=token,
-            expire_at=datetime.utcnow() + timedelta(minutes=30),
+            expire_at=expiration_time,
         )
         db.add(record)
         await db.commit()
 
-        await NotificationService.create(
+        await NotificationService.post_email(
             redis_client=redis_client,
-            ntype="email_verification",
-            user_id=auth_user.id,
-            channels=["email"],
+            receiver=payload.email,
             template_id="email_verification_for_bind",
-            payload={"token": token},
+            payload={
+                "app_name": "Visafly",
+                "code": token,
+                "expiration_time":  "10 minutes"
+            }
         )
 
     # =========================
@@ -123,21 +142,24 @@ class AuthService:
         if not user:
             raise BizLogicError("User not exist")
 
+        expiration_time = datetime.utcnow() + timedelta(minutes=10)
         token = uuid.uuid4().hex[:6]
         record = VasVerificationToken(
             token=token,
-            expire_at=datetime.utcnow() + timedelta(minutes=30),
+            expire_at=expiration_time,
         )
         db.add(record)
         await db.commit()
 
-        await NotificationService.create(
+        await NotificationService.post_email(
             redis_client=redis_client,
-            ntype="email_verification",
-            user_id=user.id,
-            channels=["email"],
+            receiver=payload.email,
             template_id="email_verification_for_reset",
-            payload={"token": token},
+            payload={
+                "app_name": "Visafly",
+                "code": token,
+                "expiration_time": "10 minutes"
+            },
         )
 
     # =========================
@@ -197,16 +219,16 @@ class AuthService:
 
         await db.commit()
         await db.refresh(user)
-
-        await NotificationService.create(
+        
+        await NotificationService.post_email(
             redis_client=redis_client,
-            ntype="login_credentials",
-            user_id=user.id,
-            channels=["email"],
+            receiver=payload.email,
             template_id="login_credentials",
             payload={
+                "app_name": "Visafly",
                 "username": payload.email,
                 "password": plain_pwd,
+                "login_url": "http://45.137.220.138:3000/login"
             },
         )
 

+ 95 - 0
app/services/llm_service.py

@@ -0,0 +1,95 @@
+import aiohttp
+import asyncio
+import json
+import os
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from app.core.config import settings
+from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
+from app.models.schema import VasSchema
+
+# --- 配置区 ---
+# 请换成你新生成的 API Key
+API_KEY = settings.openai_api_key
+API_URL = "https://api.openai.com/v1/chat/completions"
+
+
+class LlmService:
+
+    async def handle_parse(db: AsyncSession, payload: ParseUserInputsPayload):
+        stmt = select(VasSchema).where(VasSchema.id == payload.schema_id)
+        obj = (await db.execute(stmt)).scalar_one_or_none()
+        if not obj:
+            raise NotFoundError("Schema not exist")
+        parsed_obj = await LlmService.parse_data_async(payload.input_raw_str, obj.schema_json)
+        out = ParseUserInputsOut(parsed_obj=parsed_obj)
+        return out
+    
+    @staticmethod
+    async def parse_data_async(user_text: str, json_schema: dict):
+        """
+        [异步版本] 调用 LLM 解析数据
+        """
+        headers = {
+            "Authorization": f"Bearer {API_KEY}",
+            "Content-Type": "application/json"
+        }
+
+        # 构造 Prompt
+        system_instruction = "You are a specialized data extraction API. Output valid JSON only."
+        
+        user_prompt = f"""
+        Extract data from the text strictly based on the provided JSON Schema.
+        
+        [JSON Schema]
+        {json.dumps(json_schema)}
+        
+        [User Text]
+        {user_text}
+        """
+
+        payload = {
+            "model": "gpt-4o",  # 或 gpt-3.5-turbo
+            "messages": [
+                {"role": "system", "content": system_instruction},
+                {"role": "user", "content": user_prompt}
+            ],
+            "temperature": 0,
+            "response_format": {"type": "json_object"} # 强制 JSON 模式
+        }
+
+  
+        async with aiohttp.ClientSession() as session:
+            async with session.post(API_URL, headers=headers, json=payload, timeout=30) as response:
+                
+                # 1. 检查 HTTP 状态码
+                if response.status != 200:
+                    error_text = await response.text()
+                    return {"error": f"HTTP {response.status}", "detail": error_text}
+                
+                # 2. 获取响应体
+                result = await response.json()
+                
+                # 3. 提取并解析内容
+                content_str = result['choices'][0]['message']['content']
+                return json.loads(content_str)
+
+# --- 测试运行 ---
+if __name__ == "__main__":
+    # 定义你的 Schema
+    my_schema = {
+        "type": "object",
+        "properties": {
+            "full_name": {"type": "string"},
+            "budget": {"type": "integer"},
+            "items": {"type": "array", "items": {"type": "string"}}
+        },
+        "required": ["full_name", "budget"]
+    }
+
+    # 模拟用户输入
+    user_input = "我是张三,打算花2000块钱买个耳机和键盘。"
+    
+    print("正在解析...")
+    result = parse_data_api(user_input, my_schema)
+    print(json.dumps(result, ensure_ascii=False, indent=2))

+ 25 - 9
app/services/notification_service.py

@@ -10,23 +10,39 @@ from app.utils.redis_utils import redis_qpush, redis_qpop
 class NotificationService:
 
     @staticmethod
-    async def create(
+    async def post_email(
         redis_client: Redis,
-        ntype: str,
-        user_id: str,
-        channels: List[str],
+        receiver: str,
         template_id: str,
         payload: Dict[str, Any]
-    ) -> None:
+    ):
         notification_payload = {
             "notification_id": f"nid_{uuid.uuid4().hex}",
-            "type": ntype,
-            "user_id": user_id,
-            "channels": channels,
+            "channel": "email",
             "template_id": template_id,
+            "receiver": receiver,
             "payload": payload
         }
-
+        
+        await redis_qpush(
+            redis_client,
+            "vas_notification_queue",
+            notification_payload
+        )
+        
+    @staticmethod
+    async def post_wechat(
+        redis_client: Redis,
+        template_id: str,
+        payload: Dict[str, Any]
+    ):
+        notification_payload = {
+            "notification_id": f"nid_{uuid.uuid4().hex}",
+            "channel": "wechat",
+            "template_id": template_id,
+            "payload": payload
+        }
+        
         await redis_qpush(
             redis_client,
             "vas_notification_queue",

+ 76 - 90
app/services/order_service.py

@@ -22,129 +22,115 @@ from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
 class OrderService:
 
     # --------------------------------------------------
-    # 管理员强制标记为已支付
+    # 创建订单
     # --------------------------------------------------
     @staticmethod
-    async def mark_as_admin_paid(
+    async def create(
         db: AsyncSession,
-        order: VasOrder,
-        admin_user: VasUser,
+        data: VasOrderCreate,
+        product: VasProduct,
+        auth_user: VasUser,
+        redis_client: Redis,
     ) -> VasOrder:
 
-        if order.status == "paid":
-            return order
-
-        order.status = "paid"
-
-        # ===== user_inputs 安全修复 =====
-        raw_inputs = order.user_inputs
+        if not auth_user.email:
+            raise BizLogicError(
+                "Your account must be linked to an email address before you can place an order."
+            )
 
-        if isinstance(raw_inputs, str):
-            try:
-                order.user_inputs = json.loads(raw_inputs)
-            except Exception:
-                order.user_inputs = {}
-        elif raw_inputs is None or not isinstance(raw_inputs, dict):
-            order.user_inputs = {}
+        order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
 
-        order.user_inputs["_admin_bypass"] = {
-            "enabled": True,
-            "by": admin_user.id,
-            "at": datetime.utcnow().isoformat(),
-            "reason": "admin manual order",
-        }
+        order = VasOrder(
+            id=order_id,
+            **data.dict(),
+            product_name=product.title,
+            base_amount=product.price_amount,
+            base_currency=product.price_currency,
+            user_id=auth_user.id,
+        )
 
         db.add(order)
         await db.commit()
         await db.refresh(order)
 
         return order
-
-    # --------------------------------------------------
-    # 为订单创建任务(幂等)
-    # --------------------------------------------------
-    @staticmethod
-    async def create_tasks_for_order(
-        db: AsyncSession,
-        order: VasOrder
-    ) -> List[VasTask]:
-
-        if order.status != "paid":
-            return []
-
-        stmt = select(VasProductRouting).where(
-            VasProductRouting.product_id == order.product_id,
-            VasProductRouting.is_active == 1
-        )
-        result = await db.execute(stmt)
-        routings = result.scalars().all()
-
-        if not routings:
-            return []
-
-        created_tasks: List[VasTask] = []
-
-        for routing in routings:
-            exists_stmt = select(VasTask).where(
-                VasTask.order_id == order.id,
-                VasTask.routing_key == routing.routing_key,
-                VasTask.script_version == routing.script_version,
-            )
-            exists = (await db.execute(exists_stmt)).scalar_one_or_none()
-            if exists:
-                continue
-
-            task = VasTask(
-                order_id=order.id,
-                routing_key=routing.routing_key,
-                script_version=routing.script_version,
-                priority=routing.priority,
-                status="pending",
-                user_inputs=order.user_inputs,
-                config=routing.config,
-                attempt_count=0,
-                notify_count=0,
-                expire_at=datetime.utcnow() + timedelta(days=7),
-                created_at=datetime.utcnow(),
-            )
-            db.add(task)
-            created_tasks.append(task)
-
-        await db.commit()
-        return created_tasks
-
-    # --------------------------------------------------
-    # 创建订单
-    # --------------------------------------------------
+    
     @staticmethod
-    async def create(
+    async def create_by_admin(
         db: AsyncSession,
         data: VasOrderCreate,
         product: VasProduct,
         auth_user: VasUser,
         redis_client: Redis,
     ) -> VasOrder:
-
-        if not auth_user.email:
-            raise BizLogicError(
-                "Your account must be linked to an email address before you can place an order."
-            )
-
         order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
-
         order = VasOrder(
             id=order_id,
             **data.dict(),
+            status="paid",
             product_name=product.title,
             base_amount=product.price_amount,
             base_currency=product.price_currency,
             user_id=auth_user.id,
         )
+        # ===== user_inputs 安全修复 =====
+        raw_inputs = order.user_inputs
 
+        if isinstance(raw_inputs, str):
+            try:
+                order.user_inputs = json.loads(raw_inputs)
+            except Exception:
+                order.user_inputs = {}
+        elif raw_inputs is None or not isinstance(raw_inputs, dict):
+            order.user_inputs = {}
+
+        order.user_inputs["_admin_bypass"] = {
+            "enabled": True,
+            "by": auth_user.id,
+            "at": datetime.utcnow().isoformat(),
+            "reason": "admin manual order",
+        }
+        
         db.add(order)
+        
+        stmt = select(VasProductRouting).where(
+            VasProductRouting.product_id == order.product_id,
+            VasProductRouting.is_active == 1
+        )
+        result = await db.execute(stmt)
+        routings = result.scalars().all()
+
+        if routings:
+            created_tasks: List[VasTask] = []
+
+            for routing in routings:
+                exists_stmt = select(VasTask).where(
+                    VasTask.order_id == order.id,
+                    VasTask.routing_key == routing.routing_key,
+                    VasTask.script_version == routing.script_version,
+                )
+                exists = (await db.execute(exists_stmt)).scalar_one_or_none()
+                if exists:
+                    continue
+
+                task = VasTask(
+                    order_id=order.id,
+                    routing_key=routing.routing_key,
+                    script_version=routing.script_version,
+                    priority=routing.priority,
+                    status="pending",
+                    user_inputs=order.user_inputs,
+                    config=routing.config,
+                    attempt_count=0,
+                    notify_count=0,
+                    expire_at=datetime.utcnow() + timedelta(days=60),
+                    created_at=datetime.utcnow(),
+                )
+                db.add(task)
+                created_tasks.append(task)
+        
         await db.commit()
         await db.refresh(order)
-
         return order
 
     # --------------------------------------------------

+ 59 - 73
app/services/payment_service.py

@@ -26,10 +26,10 @@ from app.models.verification_token import VasVerificationToken
 from app.models.payment_provider import VasPaymentProvider
 from app.models.payment_qr import VasPaymentQR
 from app.models.payment_confirmation import VasPaymentConfirmation
-from app.schemas.payment import VasPaymentCreate
+from app.schemas.payment import VasPaymentCreate, AdminUpdateStatusPayload
 from app.schemas.payment_confirmation import VasPaymentConfirmationCreate, VasPaymentConfirmationUpdate
 from app.services.notification_service import NotificationService
-
+from app.services.webhook_service import WebhookService
 
 
 class PaymentService:
@@ -122,21 +122,29 @@ class PaymentService:
             db.add(record)
             await db.commit()
             await db.refresh(record)
+            
+        
+        stmt = select(VasPayment).where(
+            VasPayment.id == payload.payment_id
+        )
+        payment = (await db.execute(stmt)).scalar_one_or_none()
 
+        formatted_time = payload.confirmed_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
         # 2️⃣ 推送异步通知给管理员
-        # await NotificationService.create(
-        #     redis_client=redis_client,
-        #     ntype="payment_user_confirmed",
-        #     user_id=current_user.id,
-        #     channels=["wechat"],
-        #     template_id="payment_user_confirmed",
-        #     payload={
-        #         "payment_id": payload.payment_id,
-        #         "user_id": current_user.id,
-        #         "confirmed_at": record.confirmed_at.isoformat()
-        #     }
-        # )
-
+        await NotificationService.post_wechat(
+            redis_client=redis_client,
+            template_id="payment_user_confirmed",
+            payload={
+                "order_id": payment.order_id,
+                "payment_id": payload.payment_id,
+                "user_email": current_user.email,
+                "amount": payload.amount,
+                "currency": payload.currency,
+                "token": "",
+                "confirmed_at": formatted_time,
+                "provider": payment.provider
+            }
+        )
         return record
     
     @staticmethod
@@ -163,13 +171,42 @@ class PaymentService:
         record.admin_id = current_user.id
         record.admin_confirmed_at = datetime.utcnow()
         record.status = 'confirmed'
-        await PaymentService._confirm_payment_action(db, record.payment_id)
+        await PaymentService._confirm_payment_action(db, record.payment_id, 'confirmed by admin')
         
         await db.commit()
         await db.refresh(record)
 
         return record
     
+    @staticmethod
+    async def admin_update_status(
+        db: AsyncSession,
+        payment_id: int,
+        payload: AdminUpdateStatusPayload
+    ):
+        """
+        管理员确认用户的支付
+        """
+        if payload.status == "succeeded":
+            payment = await PaymentService._confirm_payment_action(db, payment_id, payload.remark)
+        else:
+            pay_stmt = (
+                select(VasPayment)
+                .where(VasPayment.id == payment_id)
+                .order_by(VasPayment.created_at.desc())
+            )
+            pay_result = await db.execute(pay_stmt)
+            payment = pay_result.scalar_one_or_none()
+
+            if not payment:
+                raise BizLogicError("Payment not found")
+            
+            payment.status = 'failed'
+        await db.commit()
+        await db.refresh(payment)
+        await db.refresh(payment)
+        return payment
+    
     @staticmethod
     async def list_payment_confirmation(
         db: AsyncSession,
@@ -190,54 +227,6 @@ class PaymentService:
 
         return await paginate(db, stmt, page, size)
     
-    @staticmethod
-    async def _create_task_if_not_exists(
-        db: AsyncSession,
-        order: VasOrder,
-    ) -> List[VasTask]:
-
-        stmt = select(VasProductRouting).where(
-            VasProductRouting.product_id == order.product_id,
-            VasProductRouting.is_active == 1,
-        )
-        result = await db.execute(stmt)
-        routings = result.scalars().all()
-
-        if not routings:
-            return []
-
-        created_tasks: List[VasTask] = []
-
-        for routing in routings:
-            exists_stmt = select(VasTask).where(
-                VasTask.order_id == order.id,
-                VasTask.routing_key == routing.routing_key,
-                VasTask.script_version == routing.script_version,
-            )
-            exists_result = await db.execute(exists_stmt)
-            exists = exists_result.scalar_one_or_none()
-
-            if exists:
-                continue
-
-            task = VasTask(
-                order_id=order.id,
-                routing_key=routing.routing_key,
-                script_version=routing.script_version,
-                priority=routing.priority,
-                status="pending",
-                user_inputs=order.user_inputs,
-                config=routing.config,
-                attempt_count=0,
-                notify_count=0,
-                expire_at=datetime.utcnow() + timedelta(days=60),
-                created_at=datetime.utcnow(),
-            )
-            db.add(task)
-            created_tasks.append(task)
-
-        return created_tasks
-    
     @staticmethod
     async def confirm_payment(
         db: AsyncSession,
@@ -256,20 +245,17 @@ class PaymentService:
         if token_obj.expire_at < datetime.utcnow():
             raise BizLogicError("Token expired")
         
-        payment = await PaymentService._confirm_payment_action(db, payment_id)
+        payment = await PaymentService._confirm_payment_action(db, payment_id, 'confirmed by admin')
         token_obj.used = 1
         await db.commit()
         return payment
     
     @staticmethod
-    async def _confirm_payment_action(db: AsyncSession, payment_id: int):
+    async def _confirm_payment_action(db: AsyncSession, payment_id: int, remark:str):
         # ---------- 查找 payment ----------
         pay_stmt = (
             select(VasPayment)
-            .where(
-                VasPayment.id == payment_id,
-                VasPayment.status == "pending",
-            )
+            .where(VasPayment.id == payment_id)
             .order_by(VasPayment.created_at.desc())
         )
         pay_result = await db.execute(pay_stmt)
@@ -297,13 +283,13 @@ class PaymentService:
             event.matched_payment_id = payment.id
             event.matched_order_id = payment.order_id
             await db.commit()
-            return None
+            raise BizLogicError("Payment has been confirmed")
 
         now = datetime.utcnow()
         payment.status = "late_paid" if payment.expire_at and now > payment.expire_at else "succeeded"
 
         payment.provider_payload = {
-            "title": "confirm by admin",
+            "title": remark,
             "received_at": now.isoformat(),
         }
 
@@ -314,7 +300,7 @@ class PaymentService:
         if order and order.status != "paid":
             order.status = "paid"
 
-        await PaymentService._create_task_if_not_exists(db, order)
+        await WebhookService._create_task_if_not_exists(db, order)
 
         event.status = "applied"
         event.matched_payment_id = payment.id

+ 110 - 0
app/services/queue_service.py

@@ -0,0 +1,110 @@
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from app.core.logger import logger
+from app.core.biz_exception import NotFoundError, BizLogicError
+from app.core.queue_manager import queue_manager
+from app.models.task import Task
+from app.models.vas_task import VasTask
+
+class QueueService:
+    
+    async def rebuild_task_queue(db: AsyncSession, task_name:str, queue_name: str):
+        if queue_manager.is_initialized(queue_name):
+            return
+        
+        queue = queue_manager.get_queue(queue_name)
+        queue.clear()
+        
+        stmt = (
+            select(Task)
+            .where(
+                Task.command == queue_name,
+                Task.status == 0
+            )
+        )
+        tasks = (await db.execute(stmt)).scalars().all()
+
+        for task in tasks:
+            queue_manager.put(
+                queue_name=queue_name,
+                task_id=task.id,
+                priority=0,
+            )
+
+        queue_manager.mark_initialized(queue_name)
+        logger.info(f"[Queue] rebuilt: {queue_name}")
+            
+    async def rebuild_vas_task_queue(db: AsyncSession, queue_name: str):
+        if queue_manager.is_initialized(queue_name):
+            return
+        
+        queue = queue_manager.get_queue(queue_name)
+        queue.clear()
+        
+        stmt = (
+            select(VasTask)
+            .where(
+                VasTask.routing_key == queue_name,
+                VasTask.status == 'pending'
+            )
+        )
+        tasks = (await db.execute(stmt)).scalars().all()
+
+        for task in tasks:
+            queue_manager.put(
+                queue_name=task.routing_key,
+                task_id=task.id,
+                priority=task.priority,
+            )
+
+        queue_manager.mark_initialized(queue_name)
+        logger.info(f"[Queue] rebuilt: {queue_name}")
+        
+    async def pop_task(db: AsyncSession, queue_name: str):
+        """
+        从指定队列出队一个任务,并标记为 RUNNING
+        """
+        await QueueService.rebuild_task_queue(db, queue_name)
+        task_id = queue_manager.pop(queue_name)
+        if not task_id:
+            raise NotFoundError(f'{queue_name} is empty')
+
+        stmt = select(Task).where(Task.id == task_id)
+        task = (await db.execute(stmt)).scalar_one_or_none()
+
+        if task.status != 0:
+            raise BizLogicError(f'task not READY, skipped')
+        
+        task.status = 1
+
+        await db.commit()
+        await db.refresh(task)
+
+        return task
+        
+        
+    async def pop_vas_task(db: AsyncSession, queue_name: str):
+        """
+        从指定队列出队一个任务,并标记为 RUNNING
+        """
+        await QueueService.rebuild_vas_task_queue(db, queue_name)
+        task_id = queue_manager.pop(queue_name)
+        if not task_id:
+            raise NotFoundError(f'{queue_name} is empty')
+        
+        stmt = select(VasTask).where(VasTask.id == task_id)
+        task = (await db.execute(stmt)).scalar_one_or_none()
+        
+        if task.status != "pending":
+            raise BizLogicError(f'task not READY, skipped')
+        
+        task.status = "running"
+        
+        await db.commit()
+        await db.refresh(task)
+
+        return task
+    
+    
+    async def dump_all():
+        return queue_manager.dump_all()

+ 17 - 0
app/services/task_service.py

@@ -5,6 +5,7 @@ from typing import List, Optional
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select, update
 
+from app.core.queue_manager import queue_manager
 from app.core.biz_exception import NotFoundError
 from app.models.task import Task
 from app.schemas.task import TaskCreate, TaskUpdate
@@ -25,6 +26,14 @@ class TaskService:
         db.add(db_obj)
         await db.commit()
         await db.refresh(db_obj)
+        
+        # 👇 只在 READY 状态才入队
+        if db_obj.status == 0:
+            queue_manager.put(
+                queue_name=db_obj.command,
+                task_id=db_obj.id,
+                priority=0,   # 先用默认优先级
+            )
         return db_obj
 
     # ======================
@@ -61,6 +70,14 @@ class TaskService:
 
         await db.commit()
         await db.refresh(db_obj)
+        
+        # 👇 核心:从非 READY → READY,才重新入队
+        if old_status != 0 and db_obj.status == 0:
+            queue_manager.put(
+                queue_name=db_obj.command,
+                task_id=db_obj.id,
+                priority=0,
+            )
         return db_obj
 
     # ======================

+ 12 - 42
app/services/ticket_service.py

@@ -16,7 +16,7 @@ from app.models.ticket import VasTicket
 from app.models.ticket_message import VasTicketMessage
 from app.schemas.ticket import VasTicketCreate
 from app.services.notification_service import NotificationService
-
+from app.services.webhook_service import WebhookService
 
 class TicketService:
 
@@ -38,15 +38,19 @@ class TicketService:
         db.add(ticket)
         await db.commit()
         await db.refresh(ticket)
-
-        await NotificationService.create(
+        
+        formatted_time = ticket.created_at.strftime('%Y-%m-%d %H:%M') + " (UTC)"
+        await NotificationService.post_email(
             redis_client=redis_client,
-            ntype="ticket created",
-            user_id=current_user.id,
-            channels=["email"],
+            receiver=current_user.email,
             template_id="ticket_created",
             payload={
+                "app_name": "Visafly",
+                "username": current_user.email,
                 "ticket_id": ticket.id,
+                "ticket_type": ticket.type,
+                "ticket_url": "http://45.137.220.138:3000/dashboard",
+                "created_at": formatted_time
             },
         )
 
@@ -154,42 +158,8 @@ class TicketService:
             )
             for task in task_res.scalars().all():
                 task.status = "cancelled"
-
-            # 2️⃣ 重新生成任务(幂等)
-            routing_res = await db.execute(
-                select(VasProductRouting).where(
-                    VasProductRouting.product_id == order.product_id,
-                    VasProductRouting.is_active == 1,
-                )
-            )
-            routings = routing_res.scalars().all()
-
-            for routing in routings:
-                exists_res = await db.execute(
-                    select(VasTask).where(
-                        VasTask.order_id == order.id,
-                        VasTask.routing_key == routing.routing_key,
-                        VasTask.script_version == routing.script_version,
-                    )
-                )
-                if exists_res.scalar_one_or_none():
-                    continue
-
-                db.add(
-                    VasTask(
-                        order_id=order.id,
-                        routing_key=routing.routing_key,
-                        script_version=routing.script_version,
-                        priority=10,
-                        status="pending",
-                        user_inputs=order.user_inputs,
-                        config=routing.config,
-                        attempt_count=0,
-                        notify_count=0,
-                        expire_at=datetime.utcnow() + timedelta(days=60),
-                        created_at=datetime.utcnow(),
-                    )
-                )
+                
+            await WebhookService._create_task_if_not_exists(db, order)
 
     # =========================
     # 工单拒绝逻辑

+ 21 - 3
app/services/vas_task_service.py

@@ -6,9 +6,11 @@ from typing import List, Optional
 from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select
 
+
 from app.utils.search import apply_keyword_search_stmt
 from app.utils.pagination import paginate
-from app.core.biz_exception import NotFoundError
+from app.core.queue_manager import queue_manager
+from app.core.biz_exception import NotFoundError,BizLogicError
 from app.models.vas_task import VasTask
 from app.models.order import VasOrder
 from app.schemas.vas_task import VasTaskCreate, VasTaskUpdate
@@ -26,6 +28,12 @@ class VasTaskService:
         db.add(rec)
         await db.commit()
         await db.refresh(rec)
+        
+        queue_manager.put(
+            queue_name=rec.routing_key,
+            task_id=task.id,
+            priority=task.priority
+        )
         return rec
 
     @staticmethod
@@ -105,12 +113,22 @@ class VasTaskService:
 
         if not rec:
             raise NotFoundError("Task not exist")
-
+        
+        if rec.status == "pending":
+            raise BizLogicError("Task is in queue already")
+        
         rec.status = "pending"
-        rec.attempt_count = (rec.attempt_count or 0) + 1
+        if rec.status == "grabbed":
+            rec.attempt_count = (rec.attempt_count or 0) + 1
 
         await db.commit()
         await db.refresh(rec)
+        
+        queue_manager.put(
+            queue_name=rec.routing_key,
+            task_id=rec.id,
+            priority= max(0, rec.priority - rec.attempt_count),
+        )
         return rec
 
     @staticmethod

+ 1 - 1
app/services/webhook_service.py

@@ -34,7 +34,7 @@ class WebhookService:
         )
         result = await db.execute(stmt)
         routings = result.scalars().all()
-
+        print(f'routings = {routings}')
         if not routings:
             return []
 

+ 66 - 19
app/services/wechat_service.py

@@ -1,42 +1,89 @@
 import httpx
+from typing import Dict, Any
 from app.core.biz_exception import BizLogicError
 from app.schemas.wechat import WechatIn
 
-
 class WechatService:
+    
     @staticmethod
-    async def push_to_wechat(payload: WechatIn):
+    async def _send_webhook(api_token: str, payload: Dict[str, Any]):
         """
-        企业微信 WebHook 推送(Async 版)
-
-        WebHook 格式:
-        https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY
+        内部私有方法:发送 HTTP 请求到企业微信 Webhook
         """
-        url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={payload.api_token}"
-
-        body = {
-            "msgtype": "text",
-            "text": {
-                "content": payload.message
-            }
-        }
+        url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={api_token}"
 
         try:
             async with httpx.AsyncClient(timeout=10) as client:
-                response = await client.post(url, json=body)
+                response = await client.post(url, json=payload)
 
         except httpx.RequestError as e:
             raise BizLogicError(f"Wechat push request error: {e}")
 
         if response.status_code != 200:
-            raise BizLogicError(
-                f"Wechat push failed, http_status={response.status_code}"
-            )
+            raise BizLogicError(f"Wechat push failed, http_status={response.status_code}")
 
         data = response.json()
         if data.get("errcode") != 0:
             raise BizLogicError(
                 f"Wechat push failed, errcode={data.get('errcode')}, errmsg={data.get('errmsg')}"
             )
-
         return True
+
+    @staticmethod
+    async def push_to_wechat(payload: WechatIn):
+        """
+        [保留原函数兼容] 发送纯文本消息
+        """
+        body = {
+            "msgtype": "text",
+            "text": {
+                "content": payload.message
+                # "mentioned_mobile_list":["13800006789","@all"] # 可选:如需艾特所有人
+            }
+        }
+        return await WechatService._send_webhook(payload.api_token, body)
+
+    @staticmethod
+    async def push_markdown(api_token: str, content: str):
+        """
+        发送 Markdown 消息 (支持标题、加粗、链接、颜色)
+        """
+        body = {
+            "msgtype": "markdown",
+            "markdown": {
+                "content": content
+            }
+        }
+        return await WechatService._send_webhook(api_token, body)
+
+    @staticmethod
+    async def push_payment_template(api_token: str, data: Dict[str, Any]):
+        """
+        专门用于发送【支付确认】的模板消息
+        适配之前的业务逻辑,将其转换为 Webhook 支持的 Markdown 格式
+        
+        Args:
+            api_token: Webhook Key
+            data: 包含 order_id, amount, currency, user_email, confirm_url 等字段的字典
+        """
+        
+        # 1. 格式化金额
+        amount_fen = data.get('amount', 0)
+        amount_str = f"{amount_fen / 100:,.2f}"
+        currency = data.get('currency', 'CNY')
+        
+        # 2. 构造 Markdown 内容
+        # <font color="warning"> 红色/橙色
+        # <font color="info"> 绿色
+        # <font color="comment"> 灰色
+        markdown_content = f"""**💰 新增待确认支付**
+> 订单号:<font color="comment">{data.get('order_id', 'N/A')}</font>
+> 用户:<font color="comment">{data.get('user_email', 'Unknown')}</font>
+> 金额:<font color="warning">{amount_str} {currency}</font>
+> 渠道:{data.get('provider', 'Manual')}
+> 时间:{data.get('time_str', '')}
+
+请核实资金到账情况。
+👉 [点击此处进行系统确认]({data.get('confirm_url')})"""
+
+        return await WechatService.push_markdown(api_token, markdown_content)

+ 247 - 29
app/tasks/notification_task.py

@@ -1,17 +1,19 @@
 
 import asyncio
+import json
+from datetime import datetime
 from typing import Dict, Any
 from redis.asyncio import Redis
+from sqlalchemy.ext.asyncio import AsyncSession
 from app.services.wechat_service import WechatService
 from app.services.email_authorizations_service import EmailAuthorizationService
 from app.utils.redis_utils import redis_qpop
 
-async def notification_consumer(redis_client: Redis):
+async def notification_consumer(session_factory, redis_client: Redis):
     """
     异步消费 Redis 队列 vas_notification_queue
     """
     queue_name = "vas_notification_queue"
-    return
     while True:
         try:
             # 阻塞获取队列消息
@@ -20,22 +22,43 @@ async def notification_consumer(redis_client: Redis):
                 await asyncio.sleep(1)  # 队列为空,休眠
                 continue
 
-            channels = message.get("channels", [])
+            channel = message.get("channel", "")
             template_id = message.get("template_id")
             payload = message.get("payload", {})
-            user_id = message.get("user_id")
-
+            
             # 按渠道发送
-            if "email" in channels:
-                # EmailService.create(user_id, template_id, payload) 是你自己实现的发送逻辑
-                await EmailAuthorizationService.send(user_id=user_id, template_id=template_id, payload=payload)
-
-            if "wechat" in channels:
-                api_token = payload.get("api_token")
-                content = payload.get("message") or payload.get("content")
-                if api_token and content:
-                    await WechatService.push_to_wechat({"api_token": api_token, "message": content})
+            if "email" == channel:
+                content = None
+                sender = None
+                subject = None
+                receiver =  message.get("receiver", "")
+                if "email_verification_for_bind" == template_id:
+                    sender = "donotreply@visafly.top"
+                    subject = "Email Verification"
+                    content = template_for_bind_email(payload)
+                if "email_verification_for_reset" == template_id:
+                    sender = "donotreply@visafly.top"
+                    subject = "Reset Password"
+                    content = template_for_reset_pwd(payload)
+                if "login_credentials" == template_id:
+                    sender = "donotreply@visafly.top"
+                    subject = "Your Account Details"
+                    content = template_for_login_credentials(payload)
+                if "ticket_created" == template_id:
+                    sender = "donotreply@visafly.top"
+                    subject = "Ticket Created"
+                    content = template_ticket_open(payload)
+                if content:
+                    async with session_factory() as db:  # type: AsyncSession
+                        auth = await EmailAuthorizationService.get_by_email(db, sender)
+                        send_result = await EmailAuthorizationService.send_email(auth, receiver, subject, "html", content)
+                        print(f"Email send result: {send_result}")
 
+            if "wechat" == channel:
+                api_token = "a8f79817-e18b-4739-8459-adb2ed5e2e32"
+                if "payment_user_confirmed" == template_id:
+                    status = await WechatService.push_payment_template(api_token, payload)
+                    print(f"Wechat send status: {status}")
             print(f"✅ Notification sent: {message.get('notification_id')}")
 
         except Exception as e:
@@ -43,7 +66,17 @@ async def notification_consumer(redis_client: Redis):
             await asyncio.sleep(1)  # 避免异常循环过快
 
 def template_for_bind_email(payload):
+    """
+    生成绑定邮箱验证码邮件
+    
+    Args:
+        payload (dict): 包含以下字段:
+            - app_name: 应用名称 (默认 Visafly)
+            - code: 验证码
+            - expiration_time: 过期时间描述 (如 "10 minutes")
+    """
     
+    # 1. 定义 HTML 模板
     template = '''
     <!DOCTYPE html>
     <html>
@@ -72,14 +105,39 @@ def template_for_bind_email(payload):
             <p>Best regards,<br>The {{app_name}} Team</p>
             
             <div class="footer">
-                &copy; 2025 {{app_name}}. All rights reserved.
+                &copy; 2026 {{app_name}}. All rights reserved.
             </div>
         </div>
     </body>
     </html>
     '''
 
+    # 2. 执行数据替换
+    # 使用 .get() 提供默认值,防止缺少字段导致报错
+    # 使用 str() 确保数据是字符串类型
+    app_name = str(payload.get('app_name', 'Visafly'))
+    code = str(payload.get('code', ''))
+    expiration_time = str(payload.get('expiration_time', '10 minutes'))
+
+    # 链式替换所有占位符
+    html_content = template.replace('{{app_name}}', app_name) \
+                           .replace('{{code}}', code) \
+                           .replace('{{expiration_time}}', expiration_time)
+
+    return html_content
+
 def template_for_reset_pwd(payload):
+    """
+    生成重置密码验证码邮件
+    
+    Args:
+        payload (dict): 包含以下字段:
+            - app_name: 应用名称 (默认 Visafly)
+            - code: 验证码
+            - expiration_time: 过期时间描述 (如 "10 minutes")
+    """
+    
+    # 1. 定义 HTML 模板
     template = '''
     <!DOCTYPE html>
     <html>
@@ -117,15 +175,40 @@ def template_for_reset_pwd(payload):
             <p>Best regards,<br>The {{app_name}} Team</p>
             
             <div class="footer">
-                &copy; 2025 {{app_name}}. All rights reserved.<br>
+                &copy; 2026 {{app_name}}. All rights reserved.<br>
                 This is an automated message, please do not reply.
             </div>
         </div>
     </body>
     </html>
     '''
+
+    # 2. 执行数据替换
+    # 使用 .get() 提供默认值,防止缺少字段导致报错
+    # 使用 str() 确保数据是字符串类型,防止 replace 报错
+    app_name = str(payload.get('app_name', 'Visafly'))
+    code = str(payload.get('code', ''))
+    expiration_time = str(payload.get('expiration_time', '10 minutes'))
+
+    html_content = template.replace('{{app_name}}', app_name) \
+                           .replace('{{code}}', code) \
+                           .replace('{{expiration_time}}', expiration_time)
+
+    return html_content
     
 def template_for_login_credentials(payload):
+    """
+    生成用户登录凭证邮件 (账号 + 临时密码)
+    
+    Args:
+        payload (dict): 包含以下字段:
+            - app_name: 应用名称
+            - username: 登录账号
+            - password: 临时密码
+            - login_url: 登录链接
+    """
+    
+    # 1. 定义 HTML 模板
     template = '''
     <!DOCTYPE html>
     <html>
@@ -177,15 +260,56 @@ def template_for_login_credentials(payload):
             </p>
 
             <div class="footer">
-                &copy; 2025 {{app_name}}. All rights reserved.<br>
+                &copy; 2026 {{app_name}}. All rights reserved.<br>
                 If you did not request this account, please contact support.
             </div>
         </div>
     </body>
     </html>
     '''
+
+    # 2. 执行数据替换
+    # 使用 payload.get() 提供默认值,防止缺少字段导致报错
+    app_name = str(payload.get('app_name', 'Visafly'))
+    username = str(payload.get('username', ''))
+    password = str(payload.get('password', ''))
+    login_url = str(payload.get('login_url', '#'))
+
+    # 链式替换所有占位符
+    html_content = template.replace('{{app_name}}', app_name) \
+                           .replace('{{username}}', username) \
+                           .replace('{{password}}', password) \
+                           .replace('{{login_url}}', login_url)
+
+    return html_content
     
 def template_ticket_open(payload):
+    """
+    生成工单创建通知邮件
+    payload 需包含: username, ticket_id, ticket_type, created_at, ticket_url, app_name
+    """
+    
+    # --- 1. 处理时间格式化逻辑 ---
+    raw_time = payload.get('created_at')
+    formatted_time = ""
+
+    if isinstance(raw_time, datetime):
+        # 如果传入的是 datetime 对象
+        formatted_time = raw_time.strftime('%Y-%m-%d %H:%M') + " (UTC)"
+    elif isinstance(raw_time, str):
+        try:
+            # 如果传入的是 ISO 字符串 (例如 '2025-12-31T02:33:00Z')
+            # 截取前19位通常能兼容大部分 ISO 格式
+            dt_obj = datetime.fromisoformat(raw_time.replace('Z', '+00:00'))
+            formatted_time = dt_obj.strftime('%Y-%m-%d %H:%M') + " (UTC)"
+        except ValueError:
+            # 如果解析失败,直接显示原字符串
+            formatted_time = raw_time
+    else:
+        formatted_time = "N/A"
+
+    # --- 2. HTML 模板 ---
+    # 注意:这里保持了 {{key}} 占位符,下面会统一替换
     template = '''
     <!DOCTYPE html>
     <html>
@@ -224,7 +348,8 @@ def template_ticket_open(payload):
                     <span class="info-value">{{ticket_type}}</span>
                 </div>
                 <div class="info-row" style="margin-bottom: 0;">
-                    <span class="info-label">Time:</span>
+                    <span class="info-label">Created Time:</span>
+                    <!-- 使用格式化后的时间 -->
                     <span class="info-value">{{created_at}}</span>
                 </div>
             </div>
@@ -241,16 +366,109 @@ def template_ticket_open(payload):
     </body>
     </html>
     '''
+
+    # --- 3. 执行替换 ---
+    # 使用 payload 中的数据替换模板占位符
+    html_content = template.replace('{{username}}', str(payload.get('username', 'User'))) \
+                           .replace('{{ticket_id}}', str(payload.get('ticket_id', ''))) \
+                           .replace('{{ticket_type}}', str(payload.get('ticket_type', ''))) \
+                           .replace('{{created_at}}', formatted_time) \
+                           .replace('{{ticket_url}}', str(payload.get('ticket_url', '#'))) \
+                           .replace('{{app_name}}', str(payload.get('app_name', 'Visafly')))
+
+    return html_content
+
+def _format_date_en(dt_obj):
+    """Format date to '15 Jan 2024' to avoid US/UK format confusion."""
+    if not dt_obj:
+        return "N/A"
+    if isinstance(dt_obj, str):
+        return dt_obj
+    return dt_obj.strftime("%d %b %Y")
+
+def _format_time_en(dt_obj):
+    if not dt_obj:
+        return ""
+    return dt_obj.strftime("%H:%M:%S")
+
+def _parse_slots_summary(availability_data):
+    """Convert JSON data to a readable English summary."""
+    if not availability_data:
+        return "No specific dates data."
     
-def template_confirm_payment(payload):
-    template = {
-        "touser": "ADMIN_USER_ID",
-        "msgtype": "textcard",
-        "agentid": 1000001,
-        "textcard": {
-            "title": "💰 待确认:收到新的手动转账",
-            "description": "<div class=\"gray\">2025-12-31 10:30:00</div> <br>订单号:ORD-20251231-001<br>用户:user@example.com<br><div class=\"highlight\">金额:¥ 3,500.00</div><br>请核实资金到账后,点击卡片确认收款。",
-            "url": "https://admin.visafly.com/payment/confirm?payment_id=123&token=secure_token_abc",
-            "btntxt": "立即确认"
+    if isinstance(availability_data, list):
+        # Take first 3 dates only to keep it clean
+        dates = availability_data[:3]
+        text = ", ".join(str(d) for d in dates)
+        if len(availability_data) > 3:
+            text += f" (+{len(availability_data) - 3} more)"
+        return text
+    return str(availability_data)
+
+def _get_display_meta(data):
+    """
+    Return dynamic header logic based on status.
+    Goal: Put the most important info in the notification preview.
+    """
+    date_str = _format_date_en(data.earliest_date)
+    
+    if data.availability_status == 'Available':
+        # Notification Preview: "🟢 UK London: 15 Jan 2024"
+        emoji = "🟢"
+        # If available, the DATE is the headline
+        headline = f"{data.country}, {data.city}: {date_str}" 
+        color = "info" # Green for WeChat
+    elif data.availability_status == 'Waitlist':
+        emoji = "🟡"
+        headline = f"Waitlist: {data.country}, {data.city}"
+        color = "warning" # Orange for WeChat
+    else:
+        emoji = "🔴"
+        headline = f"No Slots: {data.country}, {data.city}"
+        color = "comment" # Grey for WeChat
+        
+    return emoji, headline, color, date_str
+
+
+def generate_wechat_markdown(data) -> dict:
+    emoji, headline, color, date_str = _get_display_meta(data)
+    slots_summary = _parse_slots_summary(data.availability)
+    
+    # WeCom uses Markdown. 
+    # Logic: Keep the header distinct based on availability.
+    
+    markdown_content = (
+        f"# {emoji} {headline}\n"
+        f"> **Visa Type**: <font color=\"comment\">{data.visa_type}</font>\n"
+        f"> **Earliest**: <font color=\"{color}\">{date_str}</font>\n"
+        f"> **Details**: <font color=\"comment\">{slots_summary}</font>\n"
+        f"\n"
+        f"👉 [Tap to Book Appointment]({data.website})\n"
+        f"\n"
+        f"<font color=\"comment\">Updated: {_format_time_en(data.snapshot_at)}</font>"
+    )
+
+    return {
+        "msgtype": "markdown",
+        "markdown": {
+            "content": markdown_content
         }
-    }
+    }
+    
+def generate_telegram_message(data) -> str:
+    emoji, headline, _, date_str = _get_display_meta(data)
+    slots_summary = _parse_slots_summary(data.availability)
+    
+    # HTML formatting
+    # <b> for headers, <code> for copying/highlighting data
+    msg = (
+        f"{emoji} <b>{headline}</b>\n" 
+        f"──────────────────\n"
+        f"🛂 <b>Visa:</b> {data.visa_type}\n"
+        f"📅 <b>Earliest:</b> <code>{date_str}</code>\n"
+        f"📊 <b>Slots:</b> {slots_summary}\n"
+        f"──────────────────\n"
+        f"🔗 <a href='{data.website}'><b>Book Now ➜</b></a>\n\n"
+        f"🕒 <i>Checked at {_format_time_en(data.snapshot_at)}</i>"
+    )
+    return msg

+ 108 - 0
app/utils/priority_queue_utils.py

@@ -0,0 +1,108 @@
+import heapq
+import itertools
+
+REMOVED = object()
+
+
+class SimplePQ:
+    def __init__(self):
+        self.heap = []
+        self.entries = {}
+        self.counter = itertools.count()
+
+    def size(self):
+        return len(self.entries)
+
+    def put(self, task_id, priority):
+        """
+        priority: 数值越大,优先级越高
+        """
+        if task_id in self.entries:
+            self.remove(task_id)
+
+        # 👇 关键:使用 -priority
+        entry = [-priority, next(self.counter), task_id]
+        self.entries[task_id] = entry
+        heapq.heappush(self.heap, entry)
+
+    def remove(self, task_id):
+        entry = self.entries.pop(task_id, None)
+        if entry:
+            entry[2] = REMOVED
+
+    def pop(self):
+        """
+        弹出 priority 最大、且最早入队的 task_id
+        """
+        while self.heap:
+            _, _, task_id = heapq.heappop(self.heap)
+            if task_id is not REMOVED:
+                self.entries.pop(task_id, None)
+                return task_id
+        return None
+    
+    def clear(self):
+        self.heap.clear()
+        self.entries.clear()
+
+    def dump(self):
+        """
+        以 pop 顺序返回队列内容(不破坏原队列)
+        """
+        valid = [
+            (-priority, count, task_id)
+            for priority, count, task_id in self.heap
+            if task_id is not REMOVED
+        ]
+
+        # priority 大的排前面;同 priority 按 FIFO
+        valid.sort(key=lambda x: (-x[0], x[1]))
+
+        return [
+            {
+                "task_id": task_id,
+                "priority": priority,
+                "order": idx,
+            }
+            for idx, (priority, _, task_id) in enumerate(valid)
+        ]
+
+
+class NamedQueueManager:
+    def __init__(self):
+        self.queues = {}
+        self.initialized = set()
+
+    def get_queue(self, name):
+        if name not in self.queues:
+            self.queues[name] = SimplePQ()
+        return self.queues[name]
+
+    def mark_initialized(self, name):
+        self.initialized.add(name)
+
+    def is_initialized(self, name) -> bool:
+        return name in self.initialized
+
+    def put(self, queue_name, task_id, priority):
+        self.get_queue(queue_name).put(task_id, priority)
+
+    def pop(self, queue_name):
+        return self.get_queue(queue_name).pop()
+
+    def remove(self, queue_name, task_id):
+        q = self.queues.get(queue_name)
+        if q:
+            q.remove(task_id)
+
+    def list_queues(self):
+        return list(self.queues.keys())
+
+    def dump_all(self):
+        data = {}
+        for name, queue in self.queues.items():
+            data[name] = {
+                "size": queue.size(),
+                "tasks": queue.dump(),
+            }
+        return data

+ 20 - 2
app/utils/validation_utils.py

@@ -1,9 +1,27 @@
 from jsonschema import validate, ValidationError
 from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
 
+from jsonschema import validate, ValidationError
+
 def validate_user_inputs(schema_json: dict, user_inputs: dict):
-    # print(f'schema_json={schema_json}, user_inputs={user_inputs}')
     try:
         validate(instance=user_inputs, schema=schema_json)
     except ValidationError as e:
-        raise BizLogicError(f"inputs validation failed, error: {e.message}, path: {list(e.path)}")
+        # 1. 获取报错字段的路径 (例如: "user.phone")
+        field = ".".join(map(str, e.path)) if e.path else "root"
+        
+        # 2. 优先尝试从 Schema 定义中获取人类可读的 'description' 或 'title'
+        # e.schema 是当前校验失败的那个具体字段的 schema 定义片段
+        custom_msg = e.schema.get("description") or e.schema.get("title")
+        
+        # 3. 构建错误信息
+        if e.validator == 'pattern':
+            # 如果是正则错误 (pattern),且我们定义了 description,就用 description
+            # 如果没定义,就给一个通用的 "格式不正确"
+            reason = custom_msg if custom_msg else "format is invalid"
+        else:
+            # 其他类型的错误(如类型不对、必填项缺失),保留原报错或也尝试用 description
+            reason = custom_msg if custom_msg else e.message
+
+        # 4. 抛出最终的人话提示
+        raise BizLogicError(f"Invalid parameter [{field}]: {reason}")