order_service.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # app/services/order_service.py
  2. import uuid
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import List, Optional
  6. from redis.asyncio import Redis
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy import select
  9. from app.utils.search import apply_keyword_search_stmt
  10. from app.utils.pagination import paginate
  11. from app.core.biz_exception import NotFoundError, BizLogicError
  12. from app.models.user import VasUser
  13. from app.models.order import VasOrder
  14. from app.models.vas_task import VasTask
  15. from app.models.product import VasProduct
  16. from app.models.payment import VasPayment
  17. from app.models.product_routing import VasProductRouting
  18. from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs
  19. from app.services.webhook_service import WebhookService
  20. class OrderService:
  21. @staticmethod
  22. async def create(
  23. db: AsyncSession,
  24. data: VasOrderCreate,
  25. product: VasProduct,
  26. auth_user: VasUser,
  27. redis_client: Redis,
  28. ) -> VasOrder:
  29. if not auth_user.email:
  30. raise BizLogicError(
  31. "Your account must be linked to an email address before you can place an order."
  32. )
  33. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  34. safe_product_name = (product.title[:100]) if product.title else ""
  35. order = VasOrder(
  36. id=order_id,
  37. **data.dict(),
  38. product_name=safe_product_name,
  39. base_amount=product.price_amount,
  40. base_currency=product.price_currency,
  41. adjustment_delta=0,
  42. final_amount=product.price_amount,
  43. user_id=auth_user.id,
  44. )
  45. db.add(order)
  46. await db.commit()
  47. await db.refresh(order)
  48. return order
  49. @staticmethod
  50. async def cancel(
  51. db: AsyncSession,
  52. order_id,
  53. ) -> VasOrder:
  54. stmt = select(VasOrder).where(VasOrder.id == order_id)
  55. order = (await db.execute(stmt)).scalar_one_or_none()
  56. if not order:
  57. raise NotFoundError("Order not exist")
  58. order.status = "closed"
  59. task_res = await db.execute(
  60. select(VasTask).where(
  61. VasTask.order_id == order.id,
  62. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  63. )
  64. )
  65. for task in task_res.scalars().all():
  66. task.status = "cancelled"
  67. await db.commit()
  68. await db.refresh(order)
  69. return order
  70. @staticmethod
  71. async def create_by_admin(
  72. db: AsyncSession,
  73. data: VasOrderCreate,
  74. product: VasProduct,
  75. auth_user: VasUser
  76. ) -> VasOrder:
  77. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  78. safe_product_name = (product.title[:100]) if product.title else ""
  79. order = VasOrder(
  80. id=order_id,
  81. **data.dict(),
  82. status="paid",
  83. product_name=safe_product_name,
  84. base_amount=product.price_amount,
  85. base_currency=product.price_currency,
  86. adjustment_delta=0,
  87. final_amount=product.price_amount,
  88. user_id=auth_user.id,
  89. )
  90. # ===== user_inputs 安全修复 =====
  91. raw_inputs = order.user_inputs
  92. if isinstance(raw_inputs, str):
  93. try:
  94. order.user_inputs = json.loads(raw_inputs)
  95. except Exception:
  96. order.user_inputs = {}
  97. elif raw_inputs is None or not isinstance(raw_inputs, dict):
  98. order.user_inputs = {}
  99. order.user_inputs["_admin_bypass"] = {
  100. "enabled": True,
  101. "by": auth_user.id,
  102. "at": datetime.utcnow().isoformat(),
  103. "reason": "admin manual order",
  104. }
  105. db.add(order)
  106. await WebhookService._create_task_if_not_exists(db, order)
  107. await db.commit()
  108. await db.refresh(order)
  109. return order
  110. @staticmethod
  111. async def adjust_order_price(db: AsyncSession, order_id: str, payload: VasOrderAdjustPrice) -> VasOrder:
  112. stmt = select(VasOrder).where(VasOrder.id == order_id)
  113. order = (await db.execute(stmt)).scalar_one_or_none()
  114. if not order:
  115. raise NotFoundError("Order not exist")
  116. if order.status != "pending":
  117. raise BizLogicError(message="Order not adjustable")
  118. # 2. 更新订单价格
  119. order.adjustment_delta = payload.adjustment_delta
  120. order.final_amount = order.base_amount + payload.adjustment_delta
  121. if order.final_amount <= 0:
  122. raise BizLogicError(message="final_amount must be > 0")
  123. # ② 是否已有 pending payment(幂等)
  124. stmt = select(VasPayment).where(
  125. VasPayment.order_id == order.id,
  126. VasPayment.status == "pending",
  127. )
  128. active_payment = (await db.execute(stmt)).scalar_one_or_none()
  129. if active_payment:
  130. active_payment.status = "expired"
  131. await db.commit()
  132. await db.refresh(order)
  133. return order
  134. @staticmethod
  135. async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
  136. stmt = select(VasOrder).where(VasOrder.id == order_id)
  137. return (await db.execute(stmt)).scalar_one_or_none()
  138. @staticmethod
  139. async def list_by_user(
  140. db: AsyncSession,
  141. user_id: str,
  142. page: int = 0,
  143. size: int = 10,
  144. keyword: Optional[str] = None,
  145. ):
  146. stmt = select(VasOrder).where(VasOrder.user_id == user_id)
  147. stmt = apply_keyword_search_stmt(
  148. stmt=stmt,
  149. model=VasOrder,
  150. keyword=keyword,
  151. fields=["id", "user_id", "product_name", "user_inputs"],
  152. ).order_by(VasOrder.created_at.desc())
  153. return await paginate(db, stmt, page, size)
  154. @staticmethod
  155. async def list_all(
  156. db: AsyncSession,
  157. page: int = 0,
  158. size: int = 10,
  159. keyword: Optional[str] = None,
  160. ):
  161. stmt = select(VasOrder)
  162. query = apply_keyword_search_stmt(
  163. stmt=stmt,
  164. model=VasOrder,
  165. keyword=keyword,
  166. fields=["id", "user_id", "user_name", "product_name", "user_inputs"],
  167. ).order_by(VasOrder.created_at.desc())
  168. return await paginate(db, query, page, size)
  169. @staticmethod
  170. async def patch_user_inputs(
  171. db: AsyncSession,
  172. order_id: str,
  173. payload: VasOrderPatchUserInputs,
  174. ) -> VasOrder:
  175. stmt = select(VasOrder).where(VasOrder.id == order_id)
  176. order = (await db.execute(stmt)).scalar_one_or_none()
  177. if not order:
  178. raise NotFoundError("Order not exist")
  179. order.user_inputs = payload.user_inputs
  180. # 1️⃣ 取消旧任务
  181. task_res = await db.execute(
  182. select(VasTask).where(
  183. VasTask.order_id == order.id,
  184. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  185. )
  186. )
  187. for task in task_res.scalars().all():
  188. task.status = "cancelled"
  189. await db.flush()
  190. await WebhookService._create_task_if_not_exists(db, order)
  191. await db.commit()
  192. await db.refresh(order)
  193. return order