# app/services/order_service.py import uuid import json from redis.asyncio import Redis from datetime import datetime, timedelta from sqlalchemy.orm import Session from typing import List from app.utils.search import apply_keyword_search from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.core.auth import get_current_user from app.models.user import VasUser from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.product import VasProduct from app.models.product_routing import VasProductRouting from app.schemas.order import VasOrderCreate, VasOrderPatchUserInputs from app.services.notification_service import NotificationService class OrderService: @staticmethod def mark_as_admin_paid(db: Session, order: VasOrder, admin_user): if order.status == "paid": return order order.status = "paid" # ===== 核心修复点 ===== raw_inputs = order.user_inputs if isinstance(raw_inputs, str): try: order.user_inputs = json.loads(raw_inputs) except Exception: order.user_inputs = {} elif raw_inputs is None: order.user_inputs = {} elif not isinstance(raw_inputs, dict): order.user_inputs = {} # 记录绕过支付的原因(非常重要) order.user_inputs["_admin_bypass"] = { "enabled": True, "by": admin_user.id, "at": datetime.utcnow().isoformat(), "reason": "admin manual order", } db.add(order) db.commit() db.refresh(order) return order @staticmethod def create_tasks_for_order(db: Session, order: VasOrder): """ 为已支付订单创建任务(幂等) """ if order.status != "paid": return [] # ---------- 1. 查 routing ---------- routings = ( db.query(VasProductRouting) .filter( VasProductRouting.product_id == order.product_id, VasProductRouting.is_active == 1 ) .all() ) if not routings: return [] created_tasks = [] for routing in routings: # ---------- 2. 幂等判断 ---------- exists = ( db.query(VasTask) .filter( VasTask.order_id == order.id, VasTask.routing_key == routing.routing_key, VasTask.script_version == routing.script_version, ) .first() ) if exists: continue # ---------- 3. 创建 task ---------- task = VasTask( order_id=order.id, routing_key=routing.routing_key, script_version=routing.script_version, priority=10, status="pending", user_inputs=order.user_inputs, config=routing.config, attempt_count=0, notify_count=0, expire_at=datetime.utcnow() + timedelta(days=7), created_at=datetime.utcnow(), ) db.add(task) created_tasks.append(task) db.commit() return created_tasks @staticmethod def cancel_order(db, order_id, reason, admin_id): if order.status in (OrderStatus.cancelled, OrderStatus.completed): return order if order.status == OrderStatus.paid: raise HTTPException( 400, "Paid order must be refunded", ) # 2️⃣ user_inputs 写入取消信息 user_inputs = order.user_inputs or {} user_inputs["cancel"] = { "reason": reason, "by": "admin", "admin_id": admin.user_id, "at": datetime.utcnow().isoformat(), } order.user_inputs = user_inputs # payment for payment in order.payments: if payment.status in (PaymentStatus.pending,): payment.status = PaymentStatus.expired # task for task in order.tasks: task.status = TaskStatus.cancelled return order @staticmethod def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser, redis_client:Redis): if not auth_user.email: raise BizLogicError('Your account must be linked to an email address before you can place an order.') order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}" rec = VasOrder(id=order_id, **data.dict()) rec.product_name = product.title rec.base_amount = product.price_amount rec.base_currency = product.price_currency rec.user_id = auth_user.id db.add(rec) db.commit() db.refresh(rec) print(f"📧 send order created notification email") NotificationService.create( redis_client=redis_client, ntype="order create notify", user_id=auth_user.id, channels=["email"], template_id="order_create_notify", payload={ "order_id": rec.id } ) return rec @staticmethod def get(db: Session, id: str): return db.query(VasOrder).filter_by(id=id).first() @staticmethod def list_by_user(db: Session, user_id: str, page: int=0, size: int=10, keyword: str=None): query = db.query(VasOrder).filter_by(user_id=user_id) query = apply_keyword_search( query=query, model=VasOrder, keyword=keyword, fields=["id", "user_id", "product_name"] ) query = query.order_by( VasOrder.created_at.desc() ) return paginate(query, page, size) @staticmethod def list_all(db: Session, page: int=0, size: int=10, keyword: str=None): query = db.query(VasOrder) query = apply_keyword_search( query=query, model=VasOrder, keyword=keyword, fields=["id", "user_id", "user_name", "product_name", "user_inputs"] ) query = query.order_by( VasOrder.created_at.desc() ) return paginate(query, page, size) @staticmethod def patch_user_inputs(db: Session, order_id: str, payload: VasOrderPatchUserInputs): order = db.query(VasOrder).filter_by(id=order_id).first() if not order: raise NotFoundError("Order not exist") order.user_inputs = payload.user_inputs db.commit() db.refresh(order) return order