order_service.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # app/services/order_service.py
  2. import uuid
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import List, Optional
  6. from redis.asyncio import Redis
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy import select
  9. from app.utils.search import apply_keyword_search_stmt
  10. from app.utils.pagination import paginate
  11. from app.core.biz_exception import NotFoundError, BizLogicError
  12. from app.models.user import VasUser
  13. from app.models.order import VasOrder
  14. from app.models.vas_task import VasTask
  15. from app.models.product import VasProduct
  16. from app.models.product_routing import VasProductRouting
  17. from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
  18. class OrderService:
  19. # --------------------------------------------------
  20. # 管理员强制标记为已支付
  21. # --------------------------------------------------
  22. @staticmethod
  23. async def mark_as_admin_paid(
  24. db: AsyncSession,
  25. order: VasOrder,
  26. admin_user: VasUser,
  27. ) -> VasOrder:
  28. if order.status == "paid":
  29. return order
  30. order.status = "paid"
  31. # ===== user_inputs 安全修复 =====
  32. raw_inputs = order.user_inputs
  33. if isinstance(raw_inputs, str):
  34. try:
  35. order.user_inputs = json.loads(raw_inputs)
  36. except Exception:
  37. order.user_inputs = {}
  38. elif raw_inputs is None or not isinstance(raw_inputs, dict):
  39. order.user_inputs = {}
  40. order.user_inputs["_admin_bypass"] = {
  41. "enabled": True,
  42. "by": admin_user.id,
  43. "at": datetime.utcnow().isoformat(),
  44. "reason": "admin manual order",
  45. }
  46. db.add(order)
  47. await db.commit()
  48. await db.refresh(order)
  49. return order
  50. # --------------------------------------------------
  51. # 为订单创建任务(幂等)
  52. # --------------------------------------------------
  53. @staticmethod
  54. async def create_tasks_for_order(
  55. db: AsyncSession,
  56. order: VasOrder
  57. ) -> List[VasTask]:
  58. if order.status != "paid":
  59. return []
  60. stmt = select(VasProductRouting).where(
  61. VasProductRouting.product_id == order.product_id,
  62. VasProductRouting.is_active == 1
  63. )
  64. result = await db.execute(stmt)
  65. routings = result.scalars().all()
  66. if not routings:
  67. return []
  68. created_tasks: List[VasTask] = []
  69. for routing in routings:
  70. exists_stmt = select(VasTask).where(
  71. VasTask.order_id == order.id,
  72. VasTask.routing_key == routing.routing_key,
  73. VasTask.script_version == routing.script_version,
  74. )
  75. exists = (await db.execute(exists_stmt)).scalar_one_or_none()
  76. if exists:
  77. continue
  78. task = VasTask(
  79. order_id=order.id,
  80. routing_key=routing.routing_key,
  81. script_version=routing.script_version,
  82. priority=routing.priority,
  83. status="pending",
  84. user_inputs=order.user_inputs,
  85. config=routing.config,
  86. attempt_count=0,
  87. notify_count=0,
  88. expire_at=datetime.utcnow() + timedelta(days=7),
  89. created_at=datetime.utcnow(),
  90. )
  91. db.add(task)
  92. created_tasks.append(task)
  93. await db.commit()
  94. return created_tasks
  95. # --------------------------------------------------
  96. # 创建订单
  97. # --------------------------------------------------
  98. @staticmethod
  99. async def create(
  100. db: AsyncSession,
  101. data: VasOrderCreate,
  102. product: VasProduct,
  103. auth_user: VasUser,
  104. redis_client: Redis,
  105. ) -> VasOrder:
  106. if not auth_user.email:
  107. raise BizLogicError(
  108. "Your account must be linked to an email address before you can place an order."
  109. )
  110. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  111. order = VasOrder(
  112. id=order_id,
  113. **data.dict(),
  114. product_name=product.title,
  115. base_amount=product.price_amount,
  116. base_currency=product.price_currency,
  117. user_id=auth_user.id,
  118. )
  119. db.add(order)
  120. await db.commit()
  121. await db.refresh(order)
  122. return order
  123. # --------------------------------------------------
  124. # 获取订单
  125. # --------------------------------------------------
  126. @staticmethod
  127. async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
  128. stmt = select(VasOrder).where(VasOrder.id == order_id)
  129. return (await db.execute(stmt)).scalar_one_or_none()
  130. # --------------------------------------------------
  131. # 用户订单列表
  132. # --------------------------------------------------
  133. @staticmethod
  134. async def list_by_user(
  135. db: AsyncSession,
  136. user_id: str,
  137. page: int = 0,
  138. size: int = 10,
  139. keyword: Optional[str] = None,
  140. ):
  141. stmt = select(VasOrder).where(VasOrder.user_id == user_id)
  142. stmt = apply_keyword_search_stmt(
  143. stmt=stmt,
  144. model=VasOrder,
  145. keyword=keyword,
  146. fields=["id", "user_id", "product_name"],
  147. ).order_by(VasOrder.created_at.desc())
  148. return await paginate(db, stmt, page, size)
  149. # --------------------------------------------------
  150. # 管理员订单列表
  151. # --------------------------------------------------
  152. @staticmethod
  153. async def list_all(
  154. db: AsyncSession,
  155. page: int = 0,
  156. size: int = 10,
  157. keyword: Optional[str] = None,
  158. ):
  159. stmt = select(VasOrder)
  160. query = apply_keyword_search_stmt(
  161. stmt=stmt,
  162. model=VasOrder,
  163. keyword=keyword,
  164. fields=["id", "user_id", "user_name", "product_name"],
  165. ).order_by(VasOrder.created_at.desc())
  166. return await paginate(db, query, page, size)
  167. # --------------------------------------------------
  168. # 更新 user_inputs
  169. # --------------------------------------------------
  170. @staticmethod
  171. async def patch_user_inputs(
  172. db: AsyncSession,
  173. order_id: str,
  174. payload: VasOrderPatchUserInputs,
  175. ) -> VasOrder:
  176. stmt = select(VasOrder).where(VasOrder.id == order_id)
  177. order = (await db.execute(stmt)).scalar_one_or_none()
  178. if not order:
  179. raise NotFoundError("Order not exist")
  180. order.user_inputs = payload.user_inputs
  181. await db.commit()
  182. await db.refresh(order)
  183. return order