import time import uuid import json import requests from typing import List from app.core.logger import logger from fastapi import APIRouter, Request, Query, Depends, Body, UploadFile, File, HTTPException from fastapi.responses import RedirectResponse from sqlalchemy.orm import Session from app.utils.redis_utils import redis_qpush from app.utils.validation_utils import validate_user_inputs from app.core.redis import get_redis_client from app.core.database import get_db from app.core.auth import get_current_user from redis.asyncio import Redis from app.utils.response import success, fail from app.models.user import VasUser from app.models.order import VasOrder from app.models.schema import VasSchema from app.models.product import VasProduct from app.models.payment import VasPayment from app.schemas.common import ApiResponse from app.schemas.troov import TroovRate from app.schemas.sms import ShortMessageDetail from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate, ConfigurationOut from app.schemas.email_authorizations import EmailContent, EmailAuthorizationCreate, EmailAuthorizationUpdate, EmailAuthorizationOut from app.schemas.card import CardCreate, CardOut from app.schemas.task import TaskCreate, TaskOut, TaskUpdate from app.schemas.short_url import ShortUrlCreate, ShortUrlOut from app.schemas.auto_booking import AutoBookingCreate, AutoBookingOut from app.schemas.http_session import HttpSessionCreate, HttpSessionUpdate,HttpSessionOut from app.schemas.fake import FakeUser from app.schemas.auth import BindEmailRequest, LoginRequest, LoginData, AutoRegisterRequest, AutoRegisterData from app.schemas.product import VasProductCreate, VasProductUpdate, VasProductOut from app.schemas.order import VasOrderCreate, VasOrderOut from app.schemas.payment import VasPaymentCreate, VasPaymentOut from app.schemas.payment_qr import VasPaymentQrSimpleOut from app.schemas.payment_provider import VasPaymentProviderSimpleOut from app.schemas.webhook import SMSHelperWebhookPayload from app.schemas.vas_task import VasTaskCreate, VasTaskOut from app.schemas.ticket import VasTicketCreate from app.schemas.telegram import TelegramIn from app.schemas.wechat import WechatIn from app.services.configuration_service import ConfigurationService from app.services.troov_service import get_rate_by_date from app.services.sms_service import save_short_message, query_short_message from app.services.email_authorizations_service import EmailAuthorizationService from app.services.short_url_service import ShortUrlService from app.services.task_service import TaskService from app.services.card_service import CardService from app.services.seaweedfs_service import SeaweedFSService from app.services.auto_booking_service import AutoBookingService from app.services.http_session_service import HttpSessionService from app.services.fake_service import generate_fake_users from app.services.auth_service import AuthService from app.services.product_service import ProductService from app.services.order_service import OrderService from app.services.schema_service import SchemaService from app.services.payment_service import PaymentService from app.services.payment_provider_service import PaymentProviderSerivce from app.services.payment_qr_service import PaymentQrService from app.services.vas_task_service import VasTaskService from app.services.webhook_service import WebhookService from app.services.notification_service import NotificationService from app.services.ticket_service import TicketService from app.services.telegram_service import TelegramService from app.services.wechat_service import WechatService # 公共路由 public_router = APIRouter() # 受保护路由 protected_router = APIRouter() @protected_router.get("/ping", summary="心跳检测", tags=["测试接口"]) def ping(): return {"message": "pong"} @public_router.get("/sms/upload", summary="上报短信", tags=["短信接口"], response_model=ApiResponse[ShortMessageDetail]) def sms_upload( phone: str = Query(..., description="手机号"), message: str = Query(..., description="短信内容"), max_ttl: int = Query(300, description="短信保存时间(秒)"), redis_client: Redis = Depends(get_redis_client) ): """ 保存短信到 Redis """ received_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) msg = save_short_message(redis_client, phone, message, received_at, max_ttl) return success(data=msg) @protected_router.get("/sms/download", summary="读取短信", tags=["短信接口"], response_model=ApiResponse[List[ShortMessageDetail]]) def sms_download( phone: str = Query(..., description="手机号"), keyword: str = Query('', description="短信内容关键字"), sent_at: str = Query('', description="筛选时间(可选)"), redis_client: Redis = Depends(get_redis_client) ): """ 查询短信(支持关键字和时间过滤) """ obj = query_short_message(redis_client, phone, keyword or None, sent_at or None) return success(data=obj) @protected_router.get("/troov/rate", summary="TROOV 查询rate", tags=["通用接口"], response_model=ApiResponse[List[TroovRate]]) def troov_rate(date: str = Query(..., description="查询的日期, 格式: YYYY-MM-DD"), redis_client: Redis = Depends(get_redis_client)): # 调用 service 层获取数据 obj = get_rate_by_date(redis_client, date) return success(data=obj) @protected_router.post("/dynamic-configurations", summary="创建动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut]) def dynamic_config_create(config_in: ConfigurationCreate, db: Session = Depends(get_db)): obj = ConfigurationService.create(db, config_in) return success(data=obj) @protected_router.get("/dynamic-configurations/all", summary="读取所有动态配置", tags=["动态配置"], response_model=ApiResponse[List[ConfigurationOut]]) def dynamic_config_get_all(db: Session = Depends(get_db)): obj = ConfigurationService.get_all(db) return success(data=obj) @protected_router.get("/dynamic-configurations/key/{config_key}", summary="根据Key读取动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut]) def dynamic_config_get_by_key(config_key: str, db: Session = Depends(get_db)): config = ConfigurationService.get_by_key(db, config_key) return success(data=config) @protected_router.put("/dynamic-configurations/key/{config_key}", summary="根据Key更新动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut]) def dynamic_config_update_by_key(config_key: str, config_in: ConfigurationUpdate, db: Session = Depends(get_db)): config = ConfigurationService.update_by_key(db, config_key, config_in) return success(data=config) @protected_router.delete("/dynamic-configurations/key/{config_key}", summary="根据Key删除动态配置", tags=["动态配置"], response_model=ApiResponse[ConfigurationOut]) def dynamic_config_delete_by_key(config_key: str, db: Session = Depends(get_db)): config = ConfigurationService.delete_by_key(db, config_key) return success(data=config) @protected_router.post("/http-session", summary="创建http session", tags=["会话管理"],response_model=ApiResponse[HttpSessionOut]) def http_session_create( data: HttpSessionCreate, db: Session = Depends(get_db) ): logger.info(f"[Create HttpSession] sid={data.session_id}") obj = HttpSessionService.create(db, data) return success(data=obj) @protected_router.delete("/http-session", summary="删除http session", tags=["会话管理"], response_model=ApiResponse) def http_session_delete_by_sid( session_id: str = Query(...), db: Session = Depends(get_db) ): logger.info(f"[Delete HttpSession] sid={session_id}") HttpSessionService.delete_by_sid(db, session_id) return success() @protected_router.put("/http-session", summary="更新http session", tags=["会话管理"], response_model=ApiResponse[HttpSessionOut]) def http_session_update_by_sid( session_id: str = Query(...), data: HttpSessionUpdate = Body(...), db: Session = Depends(get_db) ): logger.info(f"[Update HttpSession] sid={session_id}") obj = HttpSessionService.update_by_sid(db, session_id, data) return success(data=obj) @protected_router.get("/http-session", summary="读取http session", tags=["会话管理"],response_model=ApiResponse[HttpSessionOut]) def http_session_get_by_sid( session_id: str = Query(...), db: Session = Depends(get_db) ): logger.info(f"[Get HttpSession] sid={session_id}") obj = HttpSessionService.get_by_sid(db, session_id) return success(data=obj) @protected_router.get("/email-authorizations", summary="查询所有内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[List[EmailAuthorizationOut]]) def email_authorizations_get(db: Session = Depends(get_db)): obj = EmailAuthorizationService.get_all(db) return success(data=obj) @protected_router.post("/email-authorizations", summary="创建内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[EmailAuthorizationOut]) def email_authorizations_create(data: EmailAuthorizationCreate, db: Session = Depends(get_db)): obj = EmailAuthorizationService.create(db, data) return success(data=obj) @protected_router.get("/email-authorizations/{id}", summary="通过id查询内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[EmailAuthorizationOut]) def email_authorizations_get_by_id(id: int, db: Session = Depends(get_db)): email_auth = EmailAuthorizationService.get_by_id(db, id) return success(data=email_auth) @protected_router.put("/email-authorizations/{id}", summary="通过id更新内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[EmailAuthorizationOut]) def email_authorizations_update_by_id(id: int, data: EmailAuthorizationUpdate, db: Session = Depends(get_db)): updated = EmailAuthorizationService.update(db, id, data) return success(data=updated) @protected_router.delete("/email-authorizations/{id}", summary="通过id删除内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[EmailAuthorizationOut]) def email_authorizations_delete_by_id(id: int, db: Session = Depends(get_db)): deleted = EmailAuthorizationService.delete(db, id) return deleted @protected_router.get("/email-authorizations/email/{email}", summary="通过邮箱地址查询内部邮箱", tags=["邮箱接口"], response_model=ApiResponse[EmailAuthorizationOut]) def email_authorizations_get_by_email(email: str, db: Session = Depends(get_db)): email_auth = EmailAuthorizationService.get_by_email(db, email) return email_auth @protected_router.post("/email-authorizations/fetch", summary="读取邮件, 仅限文本内容", tags=["邮箱接口"], response_model=ApiResponse[EmailContent]) def email_authorizations_fetch_email( email: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"), sender: str = Query(..., description="发件人邮箱账号或者名字"), recipient: str = Query(..., description="收件人账号或者名字"), subjectKeywords: str = Query("", description="邮件主题关键词, 支持多个关键词, 用逗号隔开"), bodyKeywords: str = Query("", description="邮件内容关键词, 支持多个关键词, 用逗号隔开"), sentDate: str = Query(..., description="发件日期, UTC时间, 格式: yyyy-mm-dd hh:mm:ss"), expiry: int = Query(300, description="邮件有效期, 单位秒, 从sentDate 开始算起"), db: Session = Depends(get_db) ): auth = EmailAuthorizationService.get_by_email(db, email) result = EmailAuthorizationService.fetch_email_authorizations( auth, sender=sender, recipient=recipient, subject_keywords=subjectKeywords, body_keywords=bodyKeywords, sent_date=sentDate, expiry=expiry, only_text=True ) return success(data={"body": result}) @protected_router.post("/email-authorizations/fetch-top", summary="从最近的几封邮件读取目标邮件,仅限文本内容, 性能会更好, 邮件多时有可能漏读", tags=["邮箱接口"], response_model=ApiResponse[EmailContent]) def email_authorizations_fetch_email_from_topn( email: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"), sender: str = Query(..., description="发件人邮箱账号或者名字"), recipient: str = Query(..., description="收件人账号或者名字"), subjectKeywords: str = Query("", description="邮件主题关键词, 支持多个关键词, 用逗号隔开"), bodyKeywords: str = Query("", description="邮件内容关键词, 支持多个关键词, 用逗号隔开"), top: int = Query(10, description="指定从最近几封邮件读取"), db: Session = Depends(get_db) ): auth = EmailAuthorizationService.get_by_email(db, email) result = EmailAuthorizationService.fetch_email_authorizations_from_top_n( auth, sender=sender, recipient=recipient, subject_keywords=subjectKeywords, body_keywords=bodyKeywords, top=top, only_text=True ) return success(data={"body": result}) @protected_router.post("/email-authorizations/forward", summary="转发邮件", tags=["邮箱接口"], response_model=ApiResponse[EmailContent]) def email_authorizations_forward_email( emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"), forwardTo: str = Query(..., description="转发到哪个邮箱地址, 格式: xxx@xxx.xxx"), sender: str = Query(..., description="发件人邮箱账号或者名字"), recipient: str = Query(..., description="收件人账号或者名字"), subjectKeywords: str = Query("", description="邮件主题关键词, 支持多个关键词, 用逗号隔开"), bodyKeywords: str = Query("", description="邮件内容关键词, 支持多个关键词, 用逗号隔开"), db: Session = Depends(get_db) ): auth = EmailAuthorizationService.get_by_email(db, emailAccount) result = EmailAuthorizationService.forward_first_matching_email( auth, forward_to = forwardTo, sender = sender, recipient = recipient, subject_keywords = subjectKeywords, body_keywords = bodyKeywords ) return success(data={"body": result}) @protected_router.post("/email-authorizations/sendmail", summary="发送邮件", tags=["邮箱接口"], response_model=ApiResponse[EmailContent]) def email_authorizations_send_email( emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"), sendTo: str = Query(..., description="收件人邮箱账号"), subject: str = Query(..., description="邮件主题"), contentType: str = Query("text", description="内容格式,支持 text 和 html"), content: EmailContent = Body("", description="邮件内容"), db: Session = Depends(get_db) ): auth = EmailAuthorizationService.get_by_email(db, emailAccount) result = EmailAuthorizationService.send_email( auth, send_to = sendTo, subject = subject, content_type = contentType, content = content.body ) return success(data={"body": result}) @protected_router.post("/email-authorizations/sendmail-bulk", summary="群发送邮件", tags=["邮箱接口"], response_model=ApiResponse[EmailContent]) def email_authorizations_send_email_bulk( emailAccount: str = Query(..., description="收件邮箱账号, 格式: xxx@xxx.xxx"), sendTo: str = Query(..., description="收件人邮箱账号,多个用逗号隔开"), subject: str = Query(..., description="邮件主题"), contentType: str = Query("text", description="内容格式,支持 text 和 html"), content: EmailContent = Body(..., description="邮件内容"), db: Session = Depends(get_db) ): auth = EmailAuthorizationService.get_by_email(db, emailAccount) result = EmailAuthorizationService.send_email_bulk( auth, send_to = sendTo, subject = subject, content_type = contentType, content = content.body ) return success(data={"body": result}) @protected_router.post("/resource/pdf", summary="上传pdf文件", tags=["文件管理"]) def resource_upload_pdf(pdf: UploadFile = File(...)): if not pdf.filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="仅支持上传 PDF 文件") result = SeaweedFSService.upload(pdf) if not result: raise HTTPException(status_code=500, detail="上传失败") return {"success": True, "fid": result["fid"], "url": result["url"]} @protected_router.get("/resource/pdf/{fid}", summary="读取pdf", tags=["文件管理"]) def resource_get_pdf(fid: str): data = SeaweedFSService.get(fid) if not data: raise HTTPException(status_code=404, detail="文件不存在") content, mime = data return Response(content=content, media_type=mime) @protected_router.delete("/resource/pdf/{fid}", summary="删除pdf文件", tags=["文件管理"]) def resource_delete_pdf(fid: str): ok = SeaweedFSService.delete(fid) if not ok: raise HTTPException(status_code=404, detail="文件不存在或删除失败") return {"success": True, "fid": fid} @protected_router.post("/resource/image", summary="上传图片", tags=["文件管理"]) def resource_upload_image(image: UploadFile = File(...)): if not image.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="仅支持上传图片文件") print('upload') result = SeaweedFSService.upload(image) if not result: raise HTTPException(status_code=500, detail="上传失败") return {"success": True, "fid": result["fid"], "url": result["url"]} @protected_router.get("/resource/image/{fid}", summary="读取图片", tags=["文件管理"]) def resource_get_image(fid: str): data = SeaweedFSService.get(fid) if not data: raise HTTPException(status_code=404, detail="图片不存在") content, mime = data return Response(content=content, media_type=mime) @protected_router.delete("/resource/image/{fid}", summary="删除图片", tags=["文件管理"]) def resource_delete_image(fid: str): ok = SeaweedFSService.delete(fid) if not ok: raise HTTPException(status_code=404, detail="图片不存在或删除失败") return {"success": True, "fid": fid} @protected_router.post("/s/generate", summary="生成短链接地址<压缩地址长度>", tags=["Short URL"], response_model=ApiResponse[ShortUrlOut]) def short_url_generate( data: ShortUrlCreate, db: Session = Depends(get_db), ): """生成短链接""" record = ShortUrlService.create_short_url(db, data.long_url) return success(data=record) @public_router.get("/s/{short_key}", summary="访问短链接地址<自动重定向到真实链接地址>", tags=["Short URL"]) def short_url_request(short_key: str, db: Session = Depends(get_db)): """访问短链接自动重定向""" long_url = ShortUrlService.get_long_url(db, short_key) return RedirectResponse(url=long_url, status_code=302) @protected_router.post("/tasks", summary="创建任务", tags=["任务管理接口"], response_model=ApiResponse[TaskOut]) def task_create(data: TaskCreate, db: Session = Depends(get_db)): """创建任务""" return TaskService.create(db, data) @protected_router.get("/tasks/{task_id:int}", summary="根据taskId读取任务状态", tags=["任务管理接口"], response_model=ApiResponse[TaskOut]) def task_get_by_id(task_id: int, db: Session = Depends(get_db)): """获取任务""" task = TaskService.get_by_id(db, task_id) return success(data=task) @protected_router.get("/tasks/pending", summary="获取等待执行的任务", tags=["任务管理接口"], response_model=ApiResponse[List[TaskOut]]) def task_get_pending( page: int = Query(0, description="第几页"), size: int = Query(10, description="分页大小"), command: str = Query(..., description="任务类型"), db: Session = Depends(get_db), ): """分页获取等待执行的任务""" obj = TaskService.get_pending(db, command, page, size) return success(data=obj) @protected_router.put("/tasks/{task_id}", summary="根据taskId更新任务状态", tags=["任务管理接口"], response_model=ApiResponse[TaskOut]) def task_update_by_id(task_id: int, data: TaskUpdate, db: Session = Depends(get_db)): """更新任务状态或结果""" updated = TaskService.update(db, task_id, data) return success(data=updated) @protected_router.post("/tg/send_message", summary="推送电报消息", tags=["消息推送接口"], response_model=ApiResponse) def tg_send_message( payload: TelegramIn ): TelegramService.push_to_telegram(payload) return success() @protected_router.post("/wechat/send", summary="推送微信消息", tags=["消息推送接口"], response_model=ApiResponse) def wechat_send( payload: WechatIn ): WechatService.push_to_wechat(payload) return success() @protected_router.post("/cards/publish", summary="创建新的消息卡片", tags=["信息卡片接口"], response_model=ApiResponse[CardOut]) def cards_publish( data: CardCreate = Body(...), db: Session = Depends(get_db) ): obj = CardService.create(db, data) return success(data=obj) @public_router.get("/cards/view", summary="分页读取全部卡片, 可选择语言", tags=["信息卡片接口"], response_model=ApiResponse[List[CardOut]]) def cards_view_paginated( page: int = Query(0, description="第几页"), size: int = Query(10, description="分页大小"), culture: str = Query("english", description="语言, 可设置 chinese, english"), db: Session = Depends(get_db) ): obj = CardService.get_paginated(db, page, size, culture) return success(data=obj) @public_router.get("/cards/view2", summary="根据关键词分页查询卡片, 可选择语言", tags=["信息卡片接口"], response_model=ApiResponse[List[CardOut]]) def cards_view_paginated2( keywords: str = Query("", description="查询的关键词,多个关键词用逗号隔开"), page: int = Query(0, description="第几页"), size: int = Query(10, description="分页大小"), culture: str = Query("english", description="语言, 可设置 chinese, english"), db: Session = Depends(get_db) ): keyword_list = [k.strip() for k in keywords.split(",") if k.strip()] obj = CardService.get_by_keywords(db, keyword_list, page, size, culture) return success(data=obj) @protected_router.get("/fake/users", summary="生成虚假的预约人信息", tags=["数据生成"], response_model=ApiResponse[List[FakeUser]]) def fake_generate_fake_users( num: int = Query(1, description="生成几个数据"), living_country = Query("Ireland", description="居住在哪个国家, China, India, United Kingdom, Ireland"), ): obj = generate_fake_users(num, living_country=living_country) return success(data=obj) @protected_router.post("/autobooking", summary="创建自动预定订单", tags=["自动预定订单管理接口"], response_model=AutoBookingOut) def autobooking_create(data: AutoBookingCreate, db: Session = Depends(get_db)): return AutoBookingService.create(db, data) @protected_router.post("/autobooking/create-by-ai", summary="用自然语言创建自动预定订单(底层使用chatgpt)", tags=["自动预定订单管理接口"]) def autobooking_create_by_ai(): # TODO: 这里可以对接 GPT 解析自然语言生成结构化 AutoBooking 数据 return {"message": "AI 自动创建暂未实现"} @protected_router.post("/autobooking/batch", summary="批量查询多个自动预定订单信息", tags=["自动预定订单管理接口"]) def autobooking_get_by_ids(ids: List[int] = Body(...), db: Session = Depends(get_db)): return AutoBookingService.batch_get_by_ids(db, ids) @protected_router.get("/autobooking", summary="分页查询所有的自动预定订单信息", tags=["自动预定订单管理接口"], response_model=List[AutoBookingOut]) def autobooking_get_paginated( tech_provider: str = Query("", description="签证网站技术提供商"), keyword: str = Query("", description="关键词查询"), page: int = Query(0, description="第几页"), size: int = Query(10, description="分页大小"), db: Session = Depends(get_db) ): return AutoBookingService.get_paginated(db, tech_provider, keyword, page, size) @protected_router.get("/autobooking/{id:int}", summary="根据id查询自动预定订单详情", tags=["自动预定订单管理接口"], response_model=AutoBookingOut) def autobooking_get_by_id(id: int, db: Session = Depends(get_db)): result = AutoBookingService.get_by_id(db, id) if not result: raise HTTPException(status_code=404, detail="未找到订单") return result @protected_router.delete("/autobooking/{id:int}", summary="根据id删除自动预定订单", tags=["自动预定订单管理接口"]) def autobooking_delete_by_id(id: int, db: Session = Depends(get_db)): ok = AutoBookingService.delete_by_id(db, id) if not ok: raise HTTPException(status_code=404, detail="删除失败或记录不存在") return {"success": True, "id": id} @protected_router.put("/autobooking/{id:int}", summary="根据id更新自动预定订单信息", tags=["自动预定订单管理接口"], response_model=AutoBookingOut) def autobooking_update_by_id(id: int, updated_order_info: dict = Body(...), db: Session = Depends(get_db)): result = AutoBookingService.update_by_id(db, id, updated_order_info) if not result: raise HTTPException(status_code=404, detail="更新失败或记录不存在") return result @protected_router.get("/autobooking/statistics", summary="统计自动预定订单信息", tags=["自动预定订单管理接口"]) def autobooking_statistics( tech_provider: str = Query("", description="签证网站技术提供商"), db: Session = Depends(get_db) ): return AutoBookingService.statistics(db, tech_provider) @protected_router.get("/autobooking/pending", summary="获取未处理的自动预定订单信息列表", tags=["自动预定订单管理接口"], response_model=List[AutoBookingOut]) def autobooking_pending( tech_provider: str = Query("", description="签证网站技术提供商"), db: Session = Depends(get_db) ): return AutoBookingService.get_pending(db, tech_provider) @public_router.post("/webhook/smshelper", summary="双开助手Webhook", tags=["webhook"], response_model=ApiResponse) def webhook_smshelper( payload: SMSHelperWebhookPayload, db: Session = Depends(get_db), redis_client: Redis = Depends(get_redis_client) ): if "微信支付" in payload.title: res = WebhookService.smshelper_payment_webhook(db, payload) if res.get('status', 'ok') == 'ok': print(f"📧 send payment succeeded notification email") return success() @public_router.post("/webhook/stripe", include_in_schema=False, summary="Stripe Webhook", tags=["webhook"], response_model=ApiResponse) def webhook_stripe( request: Request, db: Session = Depends(get_db), redis_client: Redis = Depends(get_redis_client) ): payload = request.body() sig_header = request.headers.get("stripe-signature") event = stripe.Webhook.construct_event( payload=payload, sig_header=sig_header, secret=settings.STRIPE_WEBHOOK_SECRET, ) res = WebhookService.stripe_payment_webhook(db, event) if res.get('status', 'ok') == 'ok': print(f"📧 send payment succeeded notification email") return success() @public_router.post("/vas/auth/auto-register", summary="自动注册", tags=["Visafly签证系统"], response_model=ApiResponse[AutoRegisterData]) def vas_auto_register( payload: AutoRegisterRequest, db: Session = Depends(get_db) ): res = AuthService.auto_register(db, payload) return success(data=res) @public_router.post("/vas/auth/bind-email", summary="绑定邮箱", tags=["Visafly签证系统"], response_model=ApiResponse) def vas_bind_email( payload: BindEmailRequest, current_user: VasUser = Depends(get_current_user), db: Session = Depends(get_db), redis_client: Redis = Depends(get_redis_client) ): AuthService.bind_email(db, payload, current_user, redis_client) return success(message="verify email sent, please check your inbox") @public_router.get("/vas/auth/verify-email", summary="验证邮箱", tags=["Visafly签证系统"], response_model=ApiResponse) def vas_verify_email( token: str, db: Session = Depends(get_db), redis_client: Redis = Depends(get_redis_client) ): AuthService.verify_email(db, token, redis_client) return success(message="success") @public_router.post("/vas/auth/login", summary="邮箱登录", tags=["Visafly签证系统"], response_model=ApiResponse[LoginData]) def vas_login( payload: LoginRequest, db: Session = Depends(get_db) ): res = AuthService.login(db, payload) return success(data=res) @public_router.post("/vas/product/create", summary="创建商品", tags=["Visafly签证系统"], response_model=ApiResponse[VasProductOut]) def vas_product_create( payload: VasProductCreate, db: Session = Depends(get_db) ): created_product = ProductService.create(db, payload) return success(data=created_product) @public_router.post("/vas/order/create", summary="创建订单", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderOut]) def vas_order_create( payload: VasOrderCreate, current_user: VasUser = Depends(get_current_user), db: Session = Depends(get_db), redis_client: Redis = Depends(get_redis_client) ): product = ProductService.get(db, payload.product_id) # ① 获取产品绑定的 schema schema = SchemaService.get(db, product.schema_id) # ② 校验 user_inputs validate_user_inputs( schema_json=schema.schema_json, user_inputs=json.loads(payload.user_inputs), ) created_order = OrderService.create(db, payload, product, current_user) if current_user.role == "admin": OrderService.mark_as_admin_paid(db, created_order, current_user) OrderService.create_tasks_for_order(db, order) print(f"📧 send order created notification email") NotificationService.create( redis=redis_client, ntype="order create notify", user_id=current_user.id, channels=["email"], template_id="order_create_notify", payload={ "sendTo": current_user.email, "orderId": created_order.id } ) return success(data=created_order) @public_router.get("/vas/payment_provider", summary="获取支付方式", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasPaymentProviderSimpleOut]]) def vas_payment_provider_simple_get(db: Session = Depends(get_db)): providers = PaymentProviderSerivce.list_enabled(db) return success(data=providers) @public_router.post("/vas/payment/create", summary="创建支付", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentOut]) def vas_payment_create( payload: VasPaymentCreate, db: Session = Depends(get_db) ): rate_table = { "EUR->EUR": "1", "EUR->CNY": "8.3174", "EUR->USD": "1.0842", } res = PaymentService.create_payment(db, payload) return success(data=res) @public_router.get("/vas/payment_qr/qrcode", summary="获取支付的QRCode", tags=["Visafly签证系统"], response_model=ApiResponse[VasPaymentQrSimpleOut]) def vas_payment_qr_get_qrcode_by_id(id: int, db: Session = Depends(get_db)): qr = PaymentQrService.get_by_id(db, id) return success(data=qr) @public_router.get("/vas/task/pending", summary="获取待执行的任务", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasTaskOut]]) def vas_task_pending( routing_key: str = Query(..., description="task 自定义索引"), script_version: str = Query("", description="脚本版本, 用来向后兼容"), db: Session = Depends(get_db) ): tasks = VasTaskService.get_pending(db, routing_key, order_id, script_version) return success(data=tasks) @public_router.get("/vas/task/get_by_order", summary="根据订单查找任务", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasTaskOut]]) def vas_task_pending( order_id: str = Query(..., description="订单编号"), script_version: str = Query("", description="脚本版本, 用来向后兼容"), db: Session = Depends(get_db) ): tasks = VasTaskService.get_active_task_by_order_id(db, order_id) return success(data=tasks) @public_router.post("/vas/task/return_to_queue", summary="重新放回队列", tags=["Visafly签证系统"]) def vas_task_return_to_queue(task_id:int, db: Session = Depends(get_db)): VasTaskService.return_to_queue(db, task_id) return success() @public_router.post("/vas/task/manual_confirm", summary="设置任务完成", tags=["Visafly签证系统"]) def vas_task_manual_confirm(task_id:int, db: Session = Depends(get_db)): VasTaskService.manual_confirm(db, task_id) return success() @public_router.post("/vas/ticket/create", summary="创建工单", tags=["Visafly签证系统"]) def vas_ticket_create(data:VasTicketCreate, db: Session = Depends(get_db)): TicketService.create(db, data) @public_router.post("/vas/ticket/refund/approve", summary="批准退款", tags=["Visafly签证系统"]) def vas_ticket_refund_approve(id:int, admin_comment:str, db: Session = Depends(get_db)): TicketService.set_refund_approve(db, id, admin_comment) @public_router.post("/vas/ticket/refund/need-info", summary="管理员批准退款,但是需要补充资料", tags=["Visafly签证系统"]) def vas_ticket_refund_need_info(id:int, admin_comment:str, db: Session = Depends(get_db)): TicketService.set_refund_need_info(db, id, admin_comment) @public_router.post("/vas/ticket/refund/submit-info", summary="用户提交退款补充信息", tags=["Visafly签证系统"]) def vas_ticket_refund_submit_info(ticket_id:int, extra:dict, db: Session = Depends(get_db)): TicketService.set_refund_need_info(db, ticket_id, extra) @public_router.post("/vas/ticket/refund/reject", summary="管理员:拒绝退款", tags=["Visafly签证系统"]) def vas_ticket_refund_reject(id:int, admin_comment:str, db: Session = Depends(get_db)): TicketService.reject_refund(db, id, admin_comment)