order_service.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # app/services/order_service.py
  2. import uuid
  3. from datetime import datetime
  4. from sqlalchemy.orm import Session
  5. from typing import List
  6. from app.utils.redis_utils import redis_qpush
  7. from app.core.auth import get_current_user
  8. from app.models.user import VasUser
  9. from app.models.order import VasOrder
  10. from app.models.vas_task import VasTask
  11. from app.models.product import VasProduct
  12. from app.models.product_routing import VasProductRouting
  13. from app.schemas.order import VasOrderCreate
  14. class OrderService:
  15. @staticmethod
  16. def mark_as_admin_paid(db: Session, order: VasOrder, admin_user):
  17. if order.status == "paid":
  18. return order
  19. order.status = "paid"
  20. # 记录绕过支付的原因(非常重要)
  21. order.meta = order.meta or {}
  22. order.meta["_admin_bypass"] = {
  23. "enabled": True,
  24. "by": admin_user.id,
  25. "at": datetime.utcnow().isoformat(),
  26. "reason": "admin manual order",
  27. }
  28. db.add(order)
  29. db.commit()
  30. db.refresh(order)
  31. return order
  32. @staticmethod
  33. def create_tasks_for_order(db: Session, order: VasOrder):
  34. """
  35. 为已支付订单创建任务(幂等)
  36. """
  37. if order.status != "paid":
  38. return []
  39. # ---------- 1. 查 routing ----------
  40. routings = (
  41. db.query(VasProductRouting)
  42. .filter(
  43. VasProductRouting.product_id == order.product_id,
  44. VasProductRouting.is_active == 1
  45. )
  46. .all()
  47. )
  48. if not routings:
  49. return []
  50. created_tasks = []
  51. for routing in routings:
  52. # ---------- 2. 幂等判断 ----------
  53. exists = (
  54. db.query(VasTask)
  55. .filter(
  56. VasTask.order_id == order.id,
  57. VasTask.routing_key == routing.routing_key,
  58. VasTask.script_version == routing.script_version,
  59. )
  60. .first()
  61. )
  62. if exists:
  63. continue
  64. # ---------- 3. 创建 task ----------
  65. task = VasTask(
  66. order_id=order.id,
  67. routing_key=routing.routing_key,
  68. script_version=routing.script_version,
  69. priority=10,
  70. status="pending",
  71. user_inputs=order.user_inputs,
  72. config=routing.config,
  73. attempt_count=0,
  74. notify_count=0,
  75. expire_at=datetime.utcnow() + timedelta(days=7),
  76. created_at=datetime.utcnow(),
  77. )
  78. db.add(task)
  79. created_tasks.append(task)
  80. db.commit()
  81. return created_tasks
  82. @staticmethod
  83. def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser):
  84. order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
  85. rec = VasOrder(id=order_id, **data.dict())
  86. rec.base_amount = product.price_amount
  87. rec.currency = product.price_currency
  88. rec.user_id = auth_user.id
  89. db.add(rec)
  90. db.commit()
  91. db.refresh(rec)
  92. return rec
  93. @staticmethod
  94. def get(db: Session, id: str):
  95. return db.query(VasOrder).filter_by(id=id).first()
  96. @staticmethod
  97. def list_by_user(db: Session, user_id: str, skip=0, limit=20) -> List[VasOrder]:
  98. return db.query(VasOrder).filter_by(user_id=user_id).offset(skip).limit(limit).all()