order_service.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. # app/services/order_service.py
  2. import uuid
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import List, Optional
  6. from redis.asyncio import Redis
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy import select
  9. from app.utils.search import apply_keyword_search_stmt
  10. from app.utils.pagination import paginate
  11. from app.core.biz_exception import NotFoundError, BizLogicError
  12. from app.models.user import VasUser
  13. from app.models.order import VasOrder
  14. from app.models.vas_task import VasTask
  15. from app.models.product import VasProduct
  16. from app.models.product_routing import VasProductRouting
  17. from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
  18. from app.services.webhook_service import WebhookService
  19. class OrderService:
  20. # --------------------------------------------------
  21. # 创建订单
  22. # --------------------------------------------------
  23. @staticmethod
  24. async def create(
  25. db: AsyncSession,
  26. data: VasOrderCreate,
  27. product: VasProduct,
  28. auth_user: VasUser,
  29. redis_client: Redis,
  30. ) -> VasOrder:
  31. if not auth_user.email:
  32. raise BizLogicError(
  33. "Your account must be linked to an email address before you can place an order."
  34. )
  35. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  36. order = VasOrder(
  37. id=order_id,
  38. **data.dict(),
  39. product_name=product.title,
  40. base_amount=product.price_amount,
  41. base_currency=product.price_currency,
  42. user_id=auth_user.id,
  43. )
  44. db.add(order)
  45. await db.commit()
  46. await db.refresh(order)
  47. return order
  48. # --------------------------------------------------
  49. # 取消订单
  50. # --------------------------------------------------
  51. @staticmethod
  52. async def cancel(
  53. db: AsyncSession,
  54. order_id,
  55. ) -> VasOrder:
  56. stmt = select(VasOrder).where(VasOrder.id == order_id)
  57. order = (await db.execute(stmt)).scalar_one_or_none()
  58. if not order:
  59. raise NotFoundError("Order not exist")
  60. order.status = "closed"
  61. task_res = await db.execute(
  62. select(VasTask).where(
  63. VasTask.order_id == order.id,
  64. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  65. )
  66. )
  67. for task in task_res.scalars().all():
  68. task.status = "cancelled"
  69. await db.commit()
  70. await db.refresh(order)
  71. return order
  72. @staticmethod
  73. async def create_by_admin(
  74. db: AsyncSession,
  75. data: VasOrderCreate,
  76. product: VasProduct,
  77. auth_user: VasUser,
  78. redis_client: Redis,
  79. ) -> VasOrder:
  80. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  81. order = VasOrder(
  82. id=order_id,
  83. **data.dict(),
  84. status="paid",
  85. product_name=product.title,
  86. base_amount=product.price_amount,
  87. base_currency=product.price_currency,
  88. user_id=auth_user.id,
  89. )
  90. # ===== user_inputs 安全修复 =====
  91. raw_inputs = order.user_inputs
  92. if isinstance(raw_inputs, str):
  93. try:
  94. order.user_inputs = json.loads(raw_inputs)
  95. except Exception:
  96. order.user_inputs = {}
  97. elif raw_inputs is None or not isinstance(raw_inputs, dict):
  98. order.user_inputs = {}
  99. order.user_inputs["_admin_bypass"] = {
  100. "enabled": True,
  101. "by": auth_user.id,
  102. "at": datetime.utcnow().isoformat(),
  103. "reason": "admin manual order",
  104. }
  105. db.add(order)
  106. await WebhookService._create_task_if_not_exists(db, order)
  107. await db.commit()
  108. await db.refresh(order)
  109. return order
  110. # --------------------------------------------------
  111. # 获取订单
  112. # --------------------------------------------------
  113. @staticmethod
  114. async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
  115. stmt = select(VasOrder).where(VasOrder.id == order_id)
  116. return (await db.execute(stmt)).scalar_one_or_none()
  117. # --------------------------------------------------
  118. # 用户订单列表
  119. # --------------------------------------------------
  120. @staticmethod
  121. async def list_by_user(
  122. db: AsyncSession,
  123. user_id: str,
  124. page: int = 0,
  125. size: int = 10,
  126. keyword: Optional[str] = None,
  127. ):
  128. stmt = select(VasOrder).where(VasOrder.user_id == user_id)
  129. stmt = apply_keyword_search_stmt(
  130. stmt=stmt,
  131. model=VasOrder,
  132. keyword=keyword,
  133. fields=["id", "user_id", "product_name", "user_inputs"],
  134. ).order_by(VasOrder.created_at.desc())
  135. return await paginate(db, stmt, page, size)
  136. # --------------------------------------------------
  137. # 管理员订单列表
  138. # --------------------------------------------------
  139. @staticmethod
  140. async def list_all(
  141. db: AsyncSession,
  142. page: int = 0,
  143. size: int = 10,
  144. keyword: Optional[str] = None,
  145. ):
  146. stmt = select(VasOrder)
  147. query = apply_keyword_search_stmt(
  148. stmt=stmt,
  149. model=VasOrder,
  150. keyword=keyword,
  151. fields=["id", "user_id", "user_name", "product_name", "user_inputs"],
  152. ).order_by(VasOrder.created_at.desc())
  153. return await paginate(db, query, page, size)
  154. # --------------------------------------------------
  155. # 更新 user_inputs
  156. # --------------------------------------------------
  157. @staticmethod
  158. async def patch_user_inputs(
  159. db: AsyncSession,
  160. order_id: str,
  161. payload: VasOrderPatchUserInputs,
  162. ) -> VasOrder:
  163. stmt = select(VasOrder).where(VasOrder.id == order_id)
  164. order = (await db.execute(stmt)).scalar_one_or_none()
  165. if not order:
  166. raise NotFoundError("Order not exist")
  167. order.user_inputs = payload.user_inputs
  168. # 1️⃣ 取消旧任务
  169. task_res = await db.execute(
  170. select(VasTask).where(
  171. VasTask.order_id == order.id,
  172. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  173. )
  174. )
  175. for task in task_res.scalars().all():
  176. task.status = "cancelled"
  177. await db.flush()
  178. await WebhookService._create_task_if_not_exists(db, order)
  179. await db.commit()
  180. await db.refresh(order)
  181. return order