order_service.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # app/services/order_service.py
  2. import uuid
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import List, Optional
  6. from redis.asyncio import Redis
  7. from sqlalchemy.ext.asyncio import AsyncSession
  8. from sqlalchemy import select
  9. from app.utils.search import apply_keyword_search_stmt
  10. from app.utils.pagination import paginate
  11. from app.core.biz_exception import NotFoundError, BizLogicError
  12. from app.models.user import VasUser
  13. from app.models.order import VasOrder
  14. from app.models.vas_task import VasTask
  15. from app.models.product import VasProduct
  16. from app.models.payment import VasPayment
  17. from app.models.product_routing import VasProductRouting
  18. from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs
  19. from app.services.webhook_service import WebhookService
  20. class OrderService:
  21. @staticmethod
  22. async def create(
  23. db: AsyncSession,
  24. data: VasOrderCreate,
  25. product: VasProduct,
  26. auth_user: VasUser,
  27. redis_client: Redis,
  28. ) -> VasOrder:
  29. if not auth_user.email:
  30. raise BizLogicError(
  31. "Your account must be linked to an email address before you can place an order."
  32. )
  33. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  34. order = VasOrder(
  35. id=order_id,
  36. **data.dict(),
  37. product_name=product.title,
  38. base_amount=product.price_amount,
  39. base_currency=product.price_currency,
  40. adjustment_delta=0,
  41. final_amount=product.price_amount,
  42. user_id=auth_user.id,
  43. )
  44. db.add(order)
  45. await db.commit()
  46. await db.refresh(order)
  47. return order
  48. @staticmethod
  49. async def cancel(
  50. db: AsyncSession,
  51. order_id,
  52. ) -> VasOrder:
  53. stmt = select(VasOrder).where(VasOrder.id == order_id)
  54. order = (await db.execute(stmt)).scalar_one_or_none()
  55. if not order:
  56. raise NotFoundError("Order not exist")
  57. order.status = "closed"
  58. task_res = await db.execute(
  59. select(VasTask).where(
  60. VasTask.order_id == order.id,
  61. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  62. )
  63. )
  64. for task in task_res.scalars().all():
  65. task.status = "cancelled"
  66. await db.commit()
  67. await db.refresh(order)
  68. return order
  69. @staticmethod
  70. async def create_by_admin(
  71. db: AsyncSession,
  72. data: VasOrderCreate,
  73. product: VasProduct,
  74. auth_user: VasUser,
  75. redis_client: Redis,
  76. ) -> VasOrder:
  77. order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
  78. order = VasOrder(
  79. id=order_id,
  80. **data.dict(),
  81. status="paid",
  82. product_name=product.title,
  83. base_amount=product.price_amount,
  84. base_currency=product.price_currency,
  85. adjustment_delta=0,
  86. final_amount=product.price_amount,
  87. user_id=auth_user.id,
  88. )
  89. # ===== user_inputs 安全修复 =====
  90. raw_inputs = order.user_inputs
  91. if isinstance(raw_inputs, str):
  92. try:
  93. order.user_inputs = json.loads(raw_inputs)
  94. except Exception:
  95. order.user_inputs = {}
  96. elif raw_inputs is None or not isinstance(raw_inputs, dict):
  97. order.user_inputs = {}
  98. order.user_inputs["_admin_bypass"] = {
  99. "enabled": True,
  100. "by": auth_user.id,
  101. "at": datetime.utcnow().isoformat(),
  102. "reason": "admin manual order",
  103. }
  104. db.add(order)
  105. await WebhookService._create_task_if_not_exists(db, order)
  106. await db.commit()
  107. await db.refresh(order)
  108. return order
  109. @staticmethod
  110. async def adjust_order_price(db: AsyncSession, order_id: str, payload: VasOrderAdjustPrice) -> VasOrder:
  111. stmt = select(VasOrder).where(VasOrder.id == order_id)
  112. order = (await db.execute(stmt)).scalar_one_or_none()
  113. if not order:
  114. raise NotFoundError("Order not exist")
  115. if order.status != "pending":
  116. raise BizLogicError(message="Order not adjustable")
  117. # 2. 更新订单价格
  118. order.adjustment_delta = payload.adjustment_delta
  119. order.final_amount = order.base_amount + payload.adjustment_delta
  120. if order.final_amount <= 0:
  121. raise BizLogicError(message="final_amount must be > 0")
  122. # ② 是否已有 pending payment(幂等)
  123. stmt = select(VasPayment).where(
  124. VasPayment.order_id == order.id,
  125. VasPayment.status == "pending",
  126. )
  127. active_payment = (await db.execute(stmt)).scalar_one_or_none()
  128. if active_payment:
  129. active_payment.status = "expired"
  130. await db.commit()
  131. await db.refresh(order)
  132. return order
  133. @staticmethod
  134. async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
  135. stmt = select(VasOrder).where(VasOrder.id == order_id)
  136. return (await db.execute(stmt)).scalar_one_or_none()
  137. @staticmethod
  138. async def list_by_user(
  139. db: AsyncSession,
  140. user_id: str,
  141. page: int = 0,
  142. size: int = 10,
  143. keyword: Optional[str] = None,
  144. ):
  145. stmt = select(VasOrder).where(VasOrder.user_id == user_id)
  146. stmt = apply_keyword_search_stmt(
  147. stmt=stmt,
  148. model=VasOrder,
  149. keyword=keyword,
  150. fields=["id", "user_id", "product_name", "user_inputs"],
  151. ).order_by(VasOrder.created_at.desc())
  152. return await paginate(db, stmt, page, size)
  153. @staticmethod
  154. async def list_all(
  155. db: AsyncSession,
  156. page: int = 0,
  157. size: int = 10,
  158. keyword: Optional[str] = None,
  159. ):
  160. stmt = select(VasOrder)
  161. query = apply_keyword_search_stmt(
  162. stmt=stmt,
  163. model=VasOrder,
  164. keyword=keyword,
  165. fields=["id", "user_id", "user_name", "product_name", "user_inputs"],
  166. ).order_by(VasOrder.created_at.desc())
  167. return await paginate(db, query, page, size)
  168. @staticmethod
  169. async def patch_user_inputs(
  170. db: AsyncSession,
  171. order_id: str,
  172. payload: VasOrderPatchUserInputs,
  173. ) -> VasOrder:
  174. stmt = select(VasOrder).where(VasOrder.id == order_id)
  175. order = (await db.execute(stmt)).scalar_one_or_none()
  176. if not order:
  177. raise NotFoundError("Order not exist")
  178. order.user_inputs = payload.user_inputs
  179. # 1️⃣ 取消旧任务
  180. task_res = await db.execute(
  181. select(VasTask).where(
  182. VasTask.order_id == order.id,
  183. VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
  184. )
  185. )
  186. for task in task_res.scalars().all():
  187. task.status = "cancelled"
  188. await db.flush()
  189. await WebhookService._create_task_if_not_exists(db, order)
  190. await db.commit()
  191. await db.refresh(order)
  192. return order