from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.exc import IntegrityError from datetime import datetime from typing import Optional, List from app.models.vas_task import VasTask from app.models.order_event import VasOrderEvent # 假设模型名 from app.schemas.order_event import VasOrderEventCreate from app.core.biz_exception import NotFoundError, BizLogicError class OrderEventService: @staticmethod async def create( db: AsyncSession, event_data: VasOrderEventCreate ) -> Optional[VasOrderEvent]: target_order_no = event_data.order_no # 1. 逻辑分发:如果没有传 order_no 但传了 alias_email if not target_order_no and event_data.alias_email: # 从 VasTask 的 JSON 配置中查找 alias_email 对应的 order_id stmt = ( select(VasTask.order_id) .where(VasTask.config['alias_email'].as_string() == event_data.alias_email) .limit(1) ) result = await db.execute(stmt) target_order_no = result.scalar_one_or_none() # 如果根据别名也没找到订单,抛出异常或根据业务需求处理 if not target_order_no: raise BizLogicError(message=f"Unable to find the corresponding order based on the alias email address {event_data.alias_email}") # 2. 二次校验:确保 order_no 最终必须存在(对应 DB 的 NOT NULL) if not target_order_no: raise BizLogicError("The order id cannot be empty and cannot be resolved via an alias email address.") # 3. 构造数据库模型实例 new_event = VasOrderEvent( order_no=target_order_no, event_title=event_data.event_title, event_message=event_data.event_message, email_uid=event_data.email_uid, event_time=event_data.event_time or datetime.utcnow(), created_at=datetime.utcnow() ) db.add(new_event) await db.commit() await db.refresh(new_event) return new_event @staticmethod async def get_by_order_id( db: AsyncSession, order_id: str ) -> List[VasOrderEvent]: """ 根据 order_id (数据库中对应 order_no) 查询所有事件 """ stmt = ( select(VasOrderEvent) .where(VasOrderEvent.order_no == order_id) .order_by(VasOrderEvent.created_at.desc()) # 按创建时间倒序 ) result = await db.execute(stmt) return result.scalars().all()