order_service.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. # app/services/order_service.py
  2. import uuid
  3. import json
  4. from redis.asyncio import Redis
  5. from datetime import datetime, timedelta
  6. from sqlalchemy.orm import Session
  7. from typing import List
  8. from app.utils.search import apply_keyword_search
  9. from app.utils.pagination import paginate
  10. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  11. from app.core.auth import get_current_user
  12. from app.models.user import VasUser
  13. from app.models.order import VasOrder
  14. from app.models.vas_task import VasTask
  15. from app.models.product import VasProduct
  16. from app.models.product_routing import VasProductRouting
  17. from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
  18. from app.services.notification_service import NotificationService
  19. class OrderService:
  20. @staticmethod
  21. def mark_as_admin_paid(db: Session, order: VasOrder, admin_user):
  22. if order.status == "paid":
  23. return order
  24. order.status = "paid"
  25. # ===== 核心修复点 =====
  26. raw_inputs = order.user_inputs
  27. if isinstance(raw_inputs, str):
  28. try:
  29. order.user_inputs = json.loads(raw_inputs)
  30. except Exception:
  31. order.user_inputs = {}
  32. elif raw_inputs is None:
  33. order.user_inputs = {}
  34. elif not isinstance(raw_inputs, dict):
  35. order.user_inputs = {}
  36. # 记录绕过支付的原因(非常重要)
  37. order.user_inputs["_admin_bypass"] = {
  38. "enabled": True,
  39. "by": admin_user.id,
  40. "at": datetime.utcnow().isoformat(),
  41. "reason": "admin manual order",
  42. }
  43. db.add(order)
  44. db.commit()
  45. db.refresh(order)
  46. return order
  47. @staticmethod
  48. def create_tasks_for_order(db: Session, order: VasOrder):
  49. """
  50. 为已支付订单创建任务(幂等)
  51. """
  52. if order.status != "paid":
  53. return []
  54. # ---------- 1. 查 routing ----------
  55. routings = (
  56. db.query(VasProductRouting)
  57. .filter(
  58. VasProductRouting.product_id == order.product_id,
  59. VasProductRouting.is_active == 1
  60. )
  61. .all()
  62. )
  63. if not routings:
  64. return []
  65. created_tasks = []
  66. for routing in routings:
  67. # ---------- 2. 幂等判断 ----------
  68. exists = (
  69. db.query(VasTask)
  70. .filter(
  71. VasTask.order_id == order.id,
  72. VasTask.routing_key == routing.routing_key,
  73. VasTask.script_version == routing.script_version,
  74. )
  75. .first()
  76. )
  77. if exists:
  78. continue
  79. # ---------- 3. 创建 task ----------
  80. task = VasTask(
  81. order_id=order.id,
  82. routing_key=routing.routing_key,
  83. script_version=routing.script_version,
  84. priority=10,
  85. status="pending",
  86. user_inputs=order.user_inputs,
  87. config=routing.config,
  88. attempt_count=0,
  89. notify_count=0,
  90. expire_at=datetime.utcnow() + timedelta(days=7),
  91. created_at=datetime.utcnow(),
  92. )
  93. db.add(task)
  94. created_tasks.append(task)
  95. db.commit()
  96. return created_tasks
  97. @staticmethod
  98. def cancel_order(db, order_id, reason, admin_id):
  99. if order.status in (OrderStatus.cancelled, OrderStatus.completed):
  100. return order
  101. if order.status == OrderStatus.paid:
  102. raise HTTPException(
  103. 400,
  104. "Paid order must be refunded",
  105. )
  106. # 2️⃣ user_inputs 写入取消信息
  107. user_inputs = order.user_inputs or {}
  108. user_inputs["cancel"] = {
  109. "reason": reason,
  110. "by": "admin",
  111. "admin_id": admin.user_id,
  112. "at": datetime.utcnow().isoformat(),
  113. }
  114. order.user_inputs = user_inputs
  115. # payment
  116. for payment in order.payments:
  117. if payment.status in (PaymentStatus.pending,):
  118. payment.status = PaymentStatus.expired
  119. # task
  120. for task in order.tasks:
  121. task.status = TaskStatus.cancelled
  122. return order
  123. @staticmethod
  124. def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser, redis_client:Redis):
  125. if not auth_user.email:
  126. raise BizLogicError('Your account must be linked to an email address before you can place an order.')
  127. order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
  128. rec = VasOrder(id=order_id, **data.dict())
  129. rec.product_name = product.title
  130. rec.base_amount = product.price_amount
  131. rec.base_currency = product.price_currency
  132. rec.user_id = auth_user.id
  133. db.add(rec)
  134. db.commit()
  135. db.refresh(rec)
  136. print(f"📧 send order created notification email")
  137. NotificationService.create(
  138. redis_client=redis_client,
  139. ntype="order create notify",
  140. user_id=auth_user.id,
  141. channels=["email"],
  142. template_id="order_create_notify",
  143. payload={
  144. "order_id": rec.id
  145. }
  146. )
  147. return rec
  148. @staticmethod
  149. def get(db: Session, id: str):
  150. return db.query(VasOrder).filter_by(id=id).first()
  151. @staticmethod
  152. def list_by_user(db: Session, user_id: str, page: int=0, size: int=10, keyword: str=None):
  153. query = db.query(VasOrder).filter_by(user_id=user_id)
  154. query = apply_keyword_search(
  155. query=query,
  156. model=VasOrder,
  157. keyword=keyword,
  158. fields=["id", "user_id", "product_name"]
  159. )
  160. query = query.order_by(
  161. VasOrder.created_at.desc()
  162. )
  163. return paginate(query, page, size)
  164. @staticmethod
  165. def list_all(db: Session, page: int=0, size: int=10, keyword: str=None):
  166. query = db.query(VasOrder)
  167. query = apply_keyword_search(
  168. query=query,
  169. model=VasOrder,
  170. keyword=keyword,
  171. fields=["id", "user_id", "user_name", "product_name", "user_inputs"]
  172. )
  173. query = query.order_by(
  174. VasOrder.created_at.desc()
  175. )
  176. return paginate(query, page, size)
  177. @staticmethod
  178. def patch_user_inputs(db: Session, order_id: str, payload: VasOrderPatchUserInputs):
  179. order = db.query(VasOrder).filter_by(id=order_id).first()
  180. if not order:
  181. raise NotFoundError("Order not exist")
  182. order.user_inputs = payload.user_inputs
  183. db.commit()
  184. db.refresh(order)
  185. return order