| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- # app/services/order_service.py
- import uuid
- from datetime import datetime
- from sqlalchemy.orm import Session
- from typing import List
- from app.utils.redis_utils import redis_qpush
- from app.core.auth import get_current_user
- from app.models.user import VasUser
- from app.models.order import VasOrder
- from app.models.vas_task import VasTask
- from app.models.product import VasProduct
- from app.models.product_routing import VasProductRouting
- from app.schemas.order import VasOrderCreate
- class OrderService:
-
- @staticmethod
- def mark_as_admin_paid(db: Session, order: VasOrder, admin_user):
- if order.status == "paid":
- return order
- order.status = "paid"
- # 记录绕过支付的原因(非常重要)
- order.meta = order.meta or {}
- order.meta["_admin_bypass"] = {
- "enabled": True,
- "by": admin_user.id,
- "at": datetime.utcnow().isoformat(),
- "reason": "admin manual order",
- }
- db.add(order)
- db.commit()
- db.refresh(order)
- return order
-
- @staticmethod
- def create_tasks_for_order(db: Session, order: VasOrder):
- """
- 为已支付订单创建任务(幂等)
- """
- if order.status != "paid":
- return []
- # ---------- 1. 查 routing ----------
- routings = (
- db.query(VasProductRouting)
- .filter(
- VasProductRouting.product_id == order.product_id,
- VasProductRouting.is_active == 1
- )
- .all()
- )
- if not routings:
- return []
- created_tasks = []
- for routing in routings:
- # ---------- 2. 幂等判断 ----------
- exists = (
- db.query(VasTask)
- .filter(
- VasTask.order_id == order.id,
- VasTask.routing_key == routing.routing_key,
- VasTask.script_version == routing.script_version,
- )
- .first()
- )
- if exists:
- continue
- # ---------- 3. 创建 task ----------
- task = VasTask(
- order_id=order.id,
- routing_key=routing.routing_key,
- script_version=routing.script_version,
- priority=10,
- status="pending",
- user_inputs=order.user_inputs,
- config=routing.config,
- attempt_count=0,
- notify_count=0,
- expire_at=datetime.utcnow() + timedelta(days=7),
- created_at=datetime.utcnow(),
- )
- db.add(task)
- created_tasks.append(task)
- db.commit()
- return created_tasks
-
- @staticmethod
- def create(db: Session, data: VasOrderCreate, product: VasProduct, auth_user: VasUser):
- order_id = f"ORD-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
- rec = VasOrder(id=order_id, **data.dict())
- rec.base_amount = product.price_amount
- rec.currency = product.price_currency
- rec.user_id = auth_user.id
- db.add(rec)
- db.commit()
- db.refresh(rec)
- return rec
- @staticmethod
- def get(db: Session, id: str):
- return db.query(VasOrder).filter_by(id=id).first()
- @staticmethod
- def list_by_user(db: Session, user_id: str, skip=0, limit=20) -> List[VasOrder]:
- return db.query(VasOrder).filter_by(user_id=user_id).offset(skip).limit(limit).all()
|