# app/services/order_service.py import uuid from datetime import datetime from sqlalchemy.orm import Session from typing import List from app.utils.redis_utils import redis_qpush from app.core.auth import get_current_user from app.models.user import VasUser from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.product import VasProduct from app.models.product_routing import VasProductRouting from app.schemas.order import VasOrderCreate class OrderService: @staticmethod def mark_as_admin_paid(db: Session, order: VasOrder, admin_user): if order.status == "paid": return order order.status = "paid" # 记录绕过支付的原因(非常重要) order.meta = order.meta or {} order.meta["_admin_bypass"] = { "enabled": True, "by": admin_user.id, "at": datetime.utcnow().isoformat(), "reason": "admin manual order", } db.add(order) db.commit() db.refresh(order) return order @staticmethod def create_tasks_for_order(db: Session, order: VasOrder): """ 为已支付订单创建任务(幂等) """ if order.status != "paid": return [] # ---------- 1. 查 routing ---------- routings = ( db.query(VasProductRouting) .filter( VasProductRouting.product_id == order.product_id, VasProductRouting.is_active == 1 ) .all() ) if not routings: return [] created_tasks = [] for routing in routings: # ---------- 2. 幂等判断 ---------- exists = ( db.query(VasTask) .filter( VasTask.order_id == order.id, VasTask.routing_key == routing.routing_key, VasTask.script_version == routing.script_version, ) .first() ) if exists: continue # ---------- 3. 创建 task ---------- task = VasTask( order_id=order.id, routing_key=routing.routing_key, script_version=routing.script_version, priority=10, status="pending", user_inputs=order.user_inputs, config=routing.config, attempt_count=0, notify_count=0, expire_at=datetime.utcnow() + timedelta(days=7), created_at=datetime.utcnow(), ) db.add(task) created_tasks.append(task) db.commit() return created_tasks @staticmethod def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser): order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}" rec = VasOrder(id=order_id, **data.dict()) rec.base_amount = product.price_amount rec.currency = product.price_currency rec.user_id = auth_user.id db.add(rec) db.commit() db.refresh(rec) return rec @staticmethod def get(db: Session, id: str): return db.query(VasOrder).filter_by(id=id).first() @staticmethod def list_by_user(db: Session, user_id: str, skip=0, limit=20) -> List[VasOrder]: return db.query(VasOrder).filter_by(user_id=user_id).offset(skip).limit(limit).all()