| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- # app/services/order_service.py
- import uuid
- import json
- from redis.asyncio import Redis
- from datetime import datetime, timedelta
- from sqlalchemy.orm import Session
- from typing import List
- from app.utils.search import apply_keyword_search
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
- from app.core.auth import get_current_user
- from app.models.user import VasUser
- from app.models.order import VasOrder
- from app.models.vas_task import VasTask
- from app.models.product import VasProduct
- from app.models.product_routing import VasProductRouting
- from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs
- from app.services.notification_service import NotificationService
- class OrderService:
-
- @staticmethod
- def mark_as_admin_paid(db: Session, order: VasOrder, admin_user):
- if order.status == "paid":
- return order
- order.status = "paid"
- # ===== 核心修复点 =====
- raw_inputs = order.user_inputs
- if isinstance(raw_inputs, str):
- try:
- order.user_inputs = json.loads(raw_inputs)
- except Exception:
- order.user_inputs = {}
- elif raw_inputs is None:
- order.user_inputs = {}
- elif not isinstance(raw_inputs, dict):
- order.user_inputs = {}
- # 记录绕过支付的原因(非常重要)
- order.user_inputs["_admin_bypass"] = {
- "enabled": True,
- "by": admin_user.id,
- "at": datetime.utcnow().isoformat(),
- "reason": "admin manual order",
- }
- db.add(order)
- db.commit()
- db.refresh(order)
- return order
-
- @staticmethod
- def create_tasks_for_order(db: Session, order: VasOrder):
- """
- 为已支付订单创建任务(幂等)
- """
- if order.status != "paid":
- return []
- # ---------- 1. 查 routing ----------
- routings = (
- db.query(VasProductRouting)
- .filter(
- VasProductRouting.product_id == order.product_id,
- VasProductRouting.is_active == 1
- )
- .all()
- )
- if not routings:
- return []
- created_tasks = []
- for routing in routings:
- # ---------- 2. 幂等判断 ----------
- exists = (
- db.query(VasTask)
- .filter(
- VasTask.order_id == order.id,
- VasTask.routing_key == routing.routing_key,
- VasTask.script_version == routing.script_version,
- )
- .first()
- )
- if exists:
- continue
- # ---------- 3. 创建 task ----------
- task = VasTask(
- order_id=order.id,
- routing_key=routing.routing_key,
- script_version=routing.script_version,
- priority=10,
- status="pending",
- user_inputs=order.user_inputs,
- config=routing.config,
- attempt_count=0,
- notify_count=0,
- expire_at=datetime.utcnow() + timedelta(days=7),
- created_at=datetime.utcnow(),
- )
- db.add(task)
- created_tasks.append(task)
- db.commit()
- return created_tasks
-
- @staticmethod
- def cancel_order(db, order_id, reason, admin_id):
- if order.status in (OrderStatus.cancelled, OrderStatus.completed):
- return order
- if order.status == OrderStatus.paid:
- raise HTTPException(
- 400,
- "Paid order must be refunded",
- )
- # 2️⃣ user_inputs 写入取消信息
- user_inputs = order.user_inputs or {}
- user_inputs["cancel"] = {
- "reason": reason,
- "by": "admin",
- "admin_id": admin.user_id,
- "at": datetime.utcnow().isoformat(),
- }
- order.user_inputs = user_inputs
- # payment
- for payment in order.payments:
- if payment.status in (PaymentStatus.pending,):
- payment.status = PaymentStatus.expired
- # task
- for task in order.tasks:
- task.status = TaskStatus.cancelled
- return order
-
- @staticmethod
- def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser, redis_client:Redis):
- if not auth_user.email:
- raise BizLogicError('Your account must be linked to an email address before you can place an order.')
- order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
- rec = VasOrder(id=order_id, **data.dict())
- rec.product_name = product.title
- rec.base_amount = product.price_amount
- rec.base_currency = product.price_currency
- rec.user_id = auth_user.id
- db.add(rec)
- db.commit()
- db.refresh(rec)
-
- print(f"📧 send order created notification email")
- NotificationService.create(
- redis_client=redis_client,
- ntype="order create notify",
- user_id=auth_user.id,
- channels=["email"],
- template_id="order_create_notify",
- payload={
- "order_id": rec.id
- }
- )
- return rec
- @staticmethod
- def get(db: Session, id: str):
- return db.query(VasOrder).filter_by(id=id).first()
- @staticmethod
- def list_by_user(db: Session, user_id: str, page: int=0, size: int=10, keyword: str=None):
- query = db.query(VasOrder).filter_by(user_id=user_id)
- query = apply_keyword_search(
- query=query,
- model=VasOrder,
- keyword=keyword,
- fields=["id", "user_id", "product_name"]
- )
- query = query.order_by(
- VasOrder.created_at.desc()
- )
- return paginate(query, page, size)
-
- @staticmethod
- def list_all(db: Session, page: int=0, size: int=10, keyword: str=None):
- query = db.query(VasOrder)
- query = apply_keyword_search(
- query=query,
- model=VasOrder,
- keyword=keyword,
- fields=["id", "user_id", "user_name", "product_name", "user_inputs"]
- )
- query = query.order_by(
- VasOrder.created_at.desc()
- )
- return paginate(query, page, size)
-
- @staticmethod
- def patch_user_inputs(db: Session, order_id: str, payload: VasOrderPatchUserInputs):
- order = db.query(VasOrder).filter_by(id=order_id).first()
- if not order:
- raise NotFoundError("Order not exist")
- order.user_inputs = payload.user_inputs
- db.commit()
- db.refresh(order)
- return order
-
-
|