order_event_service.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from sqlalchemy.ext.asyncio import AsyncSession
  2. from sqlalchemy import select
  3. from sqlalchemy.exc import IntegrityError
  4. from datetime import datetime
  5. from typing import Optional, List
  6. from app.models.vas_task import VasTask
  7. from app.models.order_event import VasOrderEvent # 假设模型名
  8. from app.schemas.order_event import VasOrderEventCreate
  9. from app.core.biz_exception import NotFoundError, BizLogicError
  10. class OrderEventService:
  11. @staticmethod
  12. async def create(
  13. db: AsyncSession,
  14. event_data: VasOrderEventCreate
  15. ) -> Optional[VasOrderEvent]:
  16. target_order_no = event_data.order_no
  17. # 1. 逻辑分发:如果没有传 order_no 但传了 alias_email
  18. if not target_order_no and event_data.alias_email:
  19. # 从 VasTask 的 JSON 配置中查找 alias_email 对应的 order_id
  20. stmt = (
  21. select(VasTask.order_id)
  22. .where(VasTask.config['alias_email'].as_string() == event_data.alias_email)
  23. .limit(1)
  24. )
  25. result = await db.execute(stmt)
  26. target_order_no = result.scalar_one_or_none()
  27. # 如果根据别名也没找到订单,抛出异常或根据业务需求处理
  28. if not target_order_no:
  29. raise BizLogicError(message=f"Unable to find the corresponding order based on the alias email address {event_data.alias_email}")
  30. # 2. 二次校验:确保 order_no 最终必须存在(对应 DB 的 NOT NULL)
  31. if not target_order_no:
  32. raise BizLogicError("The order id cannot be empty and cannot be resolved via an alias email address.")
  33. # 3. 构造数据库模型实例
  34. new_event = VasOrderEvent(
  35. order_no=target_order_no,
  36. event_title=event_data.event_title,
  37. event_message=event_data.event_message,
  38. email_uid=event_data.email_uid,
  39. event_time=event_data.event_time or datetime.utcnow(),
  40. created_at=datetime.utcnow()
  41. )
  42. db.add(new_event)
  43. await db.commit()
  44. await db.refresh(new_event)
  45. return new_event
  46. @staticmethod
  47. async def get_by_order_id(
  48. db: AsyncSession,
  49. order_id: str
  50. ) -> List[VasOrderEvent]:
  51. """
  52. 根据 order_id (数据库中对应 order_no) 查询所有事件
  53. """
  54. stmt = (
  55. select(VasOrderEvent)
  56. .where(VasOrderEvent.order_no == order_id)
  57. .order_by(VasOrderEvent.created_at.desc()) # 按创建时间倒序
  58. )
  59. result = await db.execute(stmt)
  60. return result.scalars().all()