Ver Fonte

feat: update

jerry há 2 meses atrás
pai
commit
d533185d81

+ 18 - 0
app/api/router.py

@@ -53,6 +53,7 @@ from app.schemas.statistics import VasStatisticsOverviewOut
 from app.schemas.llm import ParseUserInputsPayload, ParseUserInputsOut
 from app.schemas.account import AccountResponse, AccountCreate, LockRequest
 from app.schemas.docker_remote import RemoteServerConfig, DockerStatusOut, DockerLogsRequest, DockerLogsOut, ConfigReadOut, ConfigReadRequest, ConfigUpdateRequest, LogReadRequest, LogReadOut, LogListOut, DockerContainerStatus, DockerActionRequest, ServerConfigItem, ServerListOut, RemoteActionRequest
+from app.schemas.order_event import VasOrderEventCreate, VasOrderEventOut
 from app.services.docker_remote_service import DockerRemoteService
 from app.services.configuration_service import ConfigurationService
 from app.services.troov_service import TroovService
@@ -85,6 +86,7 @@ from app.services.statistics_service import StatisticsService
 from app.services.llm_service import LlmService
 from app.services.slot_refresh_status_service import SlotRefreshStatusService
 from app.services.account_service import AccountService
+from app.services.order_event_service import OrderEventService
 
 # 公共路由
 public_router = APIRouter()
@@ -1149,6 +1151,22 @@ async def vas_order_cancel(
     cancelled_order = await OrderService.cancel(db, order_id)
     return success(data=cancelled_order)
 
+@admin_required_router.post("/vas/order-event/create", summary="创建订单事件", tags=["Visafly签证系统"], response_model=ApiResponse[VasOrderEventOut])
+async def vas_order_event_create(
+    event_data: VasOrderEventCreate,
+    db: AsyncSession = Depends(get_db)
+):
+    obj = await OrderEventService.create(db, event_data)
+    return success(data=obj)
+
+@protected_router.get("/vas/order-event/list", summary="获取订单的所有事件", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasOrderEventOut]])
+async def vas_list_order_events_by_order(
+    order_id: str,
+    db: AsyncSession = Depends(get_db)
+):
+    events = await OrderEventService.get_by_order_id(db, order_id)
+    return success(data=events)
+
 @protected_router.get("/vas/payment_provider/list_enabled", summary="获取支付方式", tags=["Visafly签证系统"], response_model=ApiResponse[List[VasPaymentProviderOut]])
 async def vas_payment_provider_simple_get(
     db: AsyncSession = Depends(get_db)

+ 4 - 0
app/core/database.py

@@ -37,5 +37,9 @@ async def get_db() -> AsyncSession:
     async with AsyncSessionLocal() as session:
         try:
             yield session
+        except Exception:
+            # --- 核心改进:全局兜底回滚 ---
+            await session.rollback()
+            raise
         finally:
             await session.close()

+ 13 - 0
app/models/order_event.py

@@ -0,0 +1,13 @@
+from sqlalchemy import Column, BigInteger, String, Text, DateTime, func
+from app.core.database import Base
+
+class VasOrderEvent(Base):
+    __tablename__ = "vas_order_events"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True)
+    order_no = Column(String(100), nullable=False)
+    event_title = Column(String(255), nullable=False)
+    event_message = Column(Text)
+    email_uid = Column(BigInteger, nullable=False, unique=True)
+    event_time = Column(DateTime)
+    created_at = Column(DateTime, nullable=False, server_default=func.now())

+ 27 - 0
app/schemas/order_event.py

@@ -0,0 +1,27 @@
+from pydantic import BaseModel, Field
+from datetime import datetime
+from typing import Optional
+
+# 基础字段
+class VasOrderEventBase(BaseModel):
+    order_no: str = Field(..., max_length=100, description="订单编号")
+    event_title: str = Field(..., max_length=255, description="事件标题")
+    event_message: Optional[str] = Field(None, description="事件详情内容")
+    email_uid: int = Field(..., description="邮件唯一UID")
+    event_time: Optional[datetime] = Field(None, description="事件发生时间")
+
+# 创建时使用的 Schema
+class VasOrderEventCreate(VasOrderEventBase):
+    order_no: Optional[str] = None
+    alias_email: Optional[str] = None
+
+# API 返回时使用的 Schema
+class VasOrderEventOut(VasOrderEventBase):
+    id: int
+    order_no: str
+    created_at: datetime
+
+    class Config:
+        # Pydantic V2 使用 from_attributes = True
+        # Pydantic V1 使用 orm_mode = True
+        from_attributes = True

+ 70 - 0
app/services/order_event_service.py

@@ -0,0 +1,70 @@
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from sqlalchemy.exc import IntegrityError
+from datetime import datetime
+from typing import Optional, List
+
+from app.models.vas_task import VasTask
+from app.models.order_event import VasOrderEvent  # 假设模型名
+from app.schemas.order_event import VasOrderEventCreate
+from app.core.biz_exception import NotFoundError, BizLogicError
+
+class OrderEventService:
+    
+    @staticmethod
+    async def create(
+        db: AsyncSession,
+        event_data: VasOrderEventCreate
+    ) -> Optional[VasOrderEvent]:
+        
+        target_order_no = event_data.order_no
+
+        # 1. 逻辑分发:如果没有传 order_no 但传了 alias_email
+        if not target_order_no and event_data.alias_email:
+            # 从 VasTask 的 JSON 配置中查找 alias_email 对应的 order_id
+            stmt = (
+                select(VasTask.order_id)
+                .where(VasTask.config['alias_email'].as_string() == event_data.alias_email)
+                .limit(1)
+            )
+            result = await db.execute(stmt)
+            target_order_no = result.scalar_one_or_none()
+            
+            # 如果根据别名也没找到订单,抛出异常或根据业务需求处理
+            if not target_order_no:
+                raise BizLogicError(message=f"Unable to find the corresponding order based on the alias email address {event_data.alias_email}")
+
+        # 2. 二次校验:确保 order_no 最终必须存在(对应 DB 的 NOT NULL)
+        if not target_order_no:
+            raise BizLogicError("The order id cannot be empty and cannot be resolved via an alias email address.")
+
+        # 3. 构造数据库模型实例
+        new_event = VasOrderEvent(
+            order_no=target_order_no,
+            event_title=event_data.event_title,
+            event_message=event_data.event_message,
+            email_uid=event_data.email_uid,
+            event_time=event_data.event_time or datetime.utcnow(),
+            created_at=datetime.utcnow()
+        )
+
+        db.add(new_event)
+        await db.commit()
+        await db.refresh(new_event)
+        return new_event
+    
+    @staticmethod
+    async def get_by_order_id(
+        db: AsyncSession, 
+        order_id: str
+    ) -> List[VasOrderEvent]:
+        """
+        根据 order_id (数据库中对应 order_no) 查询所有事件
+        """
+        stmt = (
+            select(VasOrderEvent)
+            .where(VasOrderEvent.order_no == order_id)
+            .order_by(VasOrderEvent.created_at.desc()) # 按创建时间倒序
+        )
+        result = await db.execute(stmt)
+        return result.scalars().all()

+ 0 - 5
app/services/webhook_service.py

@@ -18,11 +18,6 @@ from app.schemas.webhook import SMSHelperWebhookPayload, PaymentWebhookOut
 
 
 class WebhookService:
-
-    # =========================================================
-    # 内部方法:创建 Task(幂等)
-    # =========================================================
-    @staticmethod
     # =========================================================
     # 内部方法:创建 Task(幂等)
     # =========================================================

+ 0 - 0
scripts/__init__.py


+ 77 - 0
scripts/sync_emails_to_events.py

@@ -0,0 +1,77 @@
+import asyncio
+from datetime import datetime, timedelta
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.core.database import AsyncSessionLocal
+from app.models.emails import VasEmail
+from app.services.order_event_service import OrderEventService
+from app.schemas.order_event import VasOrderEventCreate
+from app.core.biz_exception import BizLogicError
+
+async def sync_recent_emails():
+    # 1. 计算 3 周前的时间
+    three_weeks_ago = datetime.utcnow() - timedelta(weeks=4)
+    
+    batch_size = 100
+    offset = 0
+    total_processed = 0
+    total_synced = 0
+    total_skipped = 0
+
+    print(f"[*] 开始扫描邮件表,起始时间: {three_weeks_ago}")
+
+    async with AsyncSessionLocal() as db:
+        while True:
+            # 2. 分批查询 emails 表 (按 created_at 过滤并排序)
+            stmt = (
+                select(VasEmail)
+                .where(VasEmail.created_at >= three_weeks_ago)
+                .order_by(VasEmail.created_at.asc())
+                .offset(offset)
+                .limit(batch_size)
+            )
+            
+            result = await db.execute(stmt)
+            batch_emails = result.scalars().all()
+            
+            if not batch_emails:
+                break  # 没有更多数据了
+
+            for email in batch_emails:
+                total_processed += 1
+                
+                # 3. 构造创建参数
+                # recipient 映射为 alias_email
+                event_in = VasOrderEventCreate(
+                    event_title=email.subject or "No Subject",
+                    event_message=email.body_text[:2000] if email.body_text else "", # 适当截断防溢出
+                    email_uid=email.uid,
+                    event_time=email.created_at, # receive_time 是字符串,使用 created_at 更稳妥
+                    alias_email=email.recipient,
+                    order_no=None # 让 Service 根据 alias_email 自动解析
+                )
+
+                try:
+                    # 4. 调用之前写好的 Service (它会自动查询 order_id 并写入)
+                    await OrderEventService.create(db, event_in)
+                    total_synced += 1
+                    if total_synced % 10 == 0:
+                        print(f"[-] 已处理: {total_processed}, 成功写入: {total_synced}")
+                except BizLogicError as e:
+                    # 如果 alias_email 找不到对应的 order_id,会抛出这个错,我们记录并跳过
+                    total_skipped += 1
+                    # print(f"[!] 跳过 UID {email.uid}: {e.message}")
+                except Exception as e:
+                    print(f"[ERR] 处理 UID {email.uid} 时发生未知错误: {e}")
+            
+            # 移动偏移量进行下一批次
+            offset += batch_size
+
+    print(f"\n[*] 任务完成!")
+    print(f"[*] 总扫描数: {total_processed}")
+    print(f"[*] 成功写入 OrderEvent: {total_synced}")
+    print(f"[*] 被忽略(无对应订单): {total_skipped}")
+
+if __name__ == "__main__":
+    asyncio.run(sync_recent_emails())