# app/services/order_service.py import uuid import json from datetime import datetime, timedelta from typing import List, Optional from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError, BizLogicError from app.models.user import VasUser from app.models.order import VasOrder from app.models.vas_task import VasTask from app.models.product import VasProduct from app.models.payment import VasPayment from app.models.product_routing import VasProductRouting from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs from app.services.webhook_service import WebhookService class OrderService: @staticmethod async def create( db: AsyncSession, data: VasOrderCreate, product: VasProduct, auth_user: VasUser, redis_client: Redis, ) -> VasOrder: if not auth_user.email: raise BizLogicError( "Your account must be linked to an email address before you can place an order." ) order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}" order = VasOrder( id=order_id, **data.dict(), product_name=product.title, base_amount=product.price_amount, base_currency=product.price_currency, adjustment_delta=0, final_amount=product.price_amount, user_id=auth_user.id, ) db.add(order) await db.commit() await db.refresh(order) return order @staticmethod async def cancel( db: AsyncSession, order_id, ) -> VasOrder: stmt = select(VasOrder).where(VasOrder.id == order_id) order = (await db.execute(stmt)).scalar_one_or_none() if not order: raise NotFoundError("Order not exist") order.status = "closed" task_res = await db.execute( select(VasTask).where( VasTask.order_id == order.id, VasTask.status.in_(["pending", "grabbed", "running", "completed"]), ) ) for task in task_res.scalars().all(): task.status = "cancelled" await db.commit() await db.refresh(order) return order @staticmethod async def create_by_admin( db: AsyncSession, data: VasOrderCreate, product: VasProduct, auth_user: VasUser ) -> VasOrder: order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}" order = VasOrder( id=order_id, **data.dict(), status="paid", product_name=product.title, base_amount=product.price_amount, base_currency=product.price_currency, adjustment_delta=0, final_amount=product.price_amount, user_id=auth_user.id, ) # ===== user_inputs 安全修复 ===== raw_inputs = order.user_inputs if isinstance(raw_inputs, str): try: order.user_inputs = json.loads(raw_inputs) except Exception: order.user_inputs = {} elif raw_inputs is None or not isinstance(raw_inputs, dict): order.user_inputs = {} order.user_inputs["_admin_bypass"] = { "enabled": True, "by": auth_user.id, "at": datetime.utcnow().isoformat(), "reason": "admin manual order", } db.add(order) await WebhookService._create_task_if_not_exists(db, order) await db.commit() await db.refresh(order) return order @staticmethod async def adjust_order_price(db: AsyncSession, order_id: str, payload: VasOrderAdjustPrice) -> VasOrder: stmt = select(VasOrder).where(VasOrder.id == order_id) order = (await db.execute(stmt)).scalar_one_or_none() if not order: raise NotFoundError("Order not exist") if order.status != "pending": raise BizLogicError(message="Order not adjustable") # 2. 更新订单价格 order.adjustment_delta = payload.adjustment_delta order.final_amount = order.base_amount + payload.adjustment_delta if order.final_amount <= 0: raise BizLogicError(message="final_amount must be > 0") # ② 是否已有 pending payment(幂等) stmt = select(VasPayment).where( VasPayment.order_id == order.id, VasPayment.status == "pending", ) active_payment = (await db.execute(stmt)).scalar_one_or_none() if active_payment: active_payment.status = "expired" await db.commit() await db.refresh(order) return order @staticmethod async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]: stmt = select(VasOrder).where(VasOrder.id == order_id) return (await db.execute(stmt)).scalar_one_or_none() @staticmethod async def list_by_user( db: AsyncSession, user_id: str, page: int = 0, size: int = 10, keyword: Optional[str] = None, ): stmt = select(VasOrder).where(VasOrder.user_id == user_id) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasOrder, keyword=keyword, fields=["id", "user_id", "product_name", "user_inputs"], ).order_by(VasOrder.created_at.desc()) return await paginate(db, stmt, page, size) @staticmethod async def list_all( db: AsyncSession, page: int = 0, size: int = 10, keyword: Optional[str] = None, ): stmt = select(VasOrder) query = apply_keyword_search_stmt( stmt=stmt, model=VasOrder, keyword=keyword, fields=["id", "user_id", "user_name", "product_name", "user_inputs"], ).order_by(VasOrder.created_at.desc()) return await paginate(db, query, page, size) @staticmethod async def patch_user_inputs( db: AsyncSession, order_id: str, payload: VasOrderPatchUserInputs, ) -> VasOrder: stmt = select(VasOrder).where(VasOrder.id == order_id) order = (await db.execute(stmt)).scalar_one_or_none() if not order: raise NotFoundError("Order not exist") order.user_inputs = payload.user_inputs # 1️⃣ 取消旧任务 task_res = await db.execute( select(VasTask).where( VasTask.order_id == order.id, VasTask.status.in_(["pending", "grabbed", "running", "completed"]), ) ) for task in task_res.scalars().all(): task.status = "cancelled" await db.flush() await WebhookService._create_task_if_not_exists(db, order) await db.commit() await db.refresh(order) return order