| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- # app/services/order_service.py
- import uuid
- import json
- from datetime import datetime, timedelta
- from typing import List, Optional
- from redis.asyncio import Redis
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.utils.search import apply_keyword_search_stmt
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError, BizLogicError
- from app.models.user import VasUser
- from app.models.order import VasOrder
- from app.models.vas_task import VasTask
- from app.models.product import VasProduct
- from app.models.payment import VasPayment
- from app.models.product_routing import VasProductRouting
- from app.schemas.order import VasOrderCreate, VasOrderAdjustPrice, VasOrderPatchUserInputs
- from app.services.webhook_service import WebhookService
- class OrderService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: VasOrderCreate,
- product: VasProduct,
- auth_user: VasUser,
- redis_client: Redis,
- ) -> VasOrder:
- if not auth_user.email:
- raise BizLogicError(
- "Your account must be linked to an email address before you can place an order."
- )
- order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
- order = VasOrder(
- id=order_id,
- **data.dict(),
- product_name=product.title,
- base_amount=product.price_amount,
- base_currency=product.price_currency,
- adjustment_delta=0,
- final_amount=product.price_amount,
- user_id=auth_user.id,
- )
- db.add(order)
- await db.commit()
- await db.refresh(order)
- return order
-
- @staticmethod
- async def cancel(
- db: AsyncSession,
- order_id,
- ) -> VasOrder:
- stmt = select(VasOrder).where(VasOrder.id == order_id)
- order = (await db.execute(stmt)).scalar_one_or_none()
- if not order:
- raise NotFoundError("Order not exist")
-
- order.status = "closed"
- task_res = await db.execute(
- select(VasTask).where(
- VasTask.order_id == order.id,
- VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
- )
- )
- for task in task_res.scalars().all():
- task.status = "cancelled"
-
- await db.commit()
- await db.refresh(order)
- return order
-
- @staticmethod
- async def create_by_admin(
- db: AsyncSession,
- data: VasOrderCreate,
- product: VasProduct,
- auth_user: VasUser,
- redis_client: Redis,
- ) -> VasOrder:
- order_id = f"ORD-{datetime.utcnow():%Y%m%d%H%M%S}-{uuid.uuid4().hex[:8]}"
- order = VasOrder(
- id=order_id,
- **data.dict(),
- status="paid",
- product_name=product.title,
- base_amount=product.price_amount,
- base_currency=product.price_currency,
- adjustment_delta=0,
- final_amount=product.price_amount,
- user_id=auth_user.id,
- )
- # ===== user_inputs 安全修复 =====
- raw_inputs = order.user_inputs
- if isinstance(raw_inputs, str):
- try:
- order.user_inputs = json.loads(raw_inputs)
- except Exception:
- order.user_inputs = {}
- elif raw_inputs is None or not isinstance(raw_inputs, dict):
- order.user_inputs = {}
- order.user_inputs["_admin_bypass"] = {
- "enabled": True,
- "by": auth_user.id,
- "at": datetime.utcnow().isoformat(),
- "reason": "admin manual order",
- }
-
- db.add(order)
-
- await WebhookService._create_task_if_not_exists(db, order)
-
- await db.commit()
- await db.refresh(order)
- return order
-
- @staticmethod
- async def adjust_order_price(db: AsyncSession, order_id: str, payload: VasOrderAdjustPrice) -> VasOrder:
- stmt = select(VasOrder).where(VasOrder.id == order_id)
- order = (await db.execute(stmt)).scalar_one_or_none()
- if not order:
- raise NotFoundError("Order not exist")
- if order.status != "pending":
- raise BizLogicError(message="Order not adjustable")
- # 2. 更新订单价格
- order.adjustment_delta = payload.adjustment_delta
- order.final_amount = order.base_amount + payload.adjustment_delta
- if order.final_amount <= 0:
- raise BizLogicError(message="final_amount must be > 0")
- # ② 是否已有 pending payment(幂等)
- stmt = select(VasPayment).where(
- VasPayment.order_id == order.id,
- VasPayment.status == "pending",
- )
- active_payment = (await db.execute(stmt)).scalar_one_or_none()
- if active_payment:
- active_payment.status = "expired"
- await db.commit()
- await db.refresh(order)
- return order
- @staticmethod
- async def get(db: AsyncSession, order_id: str) -> Optional[VasOrder]:
- stmt = select(VasOrder).where(VasOrder.id == order_id)
- return (await db.execute(stmt)).scalar_one_or_none()
- @staticmethod
- async def list_by_user(
- db: AsyncSession,
- user_id: str,
- page: int = 0,
- size: int = 10,
- keyword: Optional[str] = None,
- ):
- stmt = select(VasOrder).where(VasOrder.user_id == user_id)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasOrder,
- keyword=keyword,
- fields=["id", "user_id", "product_name", "user_inputs"],
- ).order_by(VasOrder.created_at.desc())
- return await paginate(db, stmt, page, size)
- @staticmethod
- async def list_all(
- db: AsyncSession,
- page: int = 0,
- size: int = 10,
- keyword: Optional[str] = None,
- ):
- stmt = select(VasOrder)
- query = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasOrder,
- keyword=keyword,
- fields=["id", "user_id", "user_name", "product_name", "user_inputs"],
- ).order_by(VasOrder.created_at.desc())
- return await paginate(db, query, page, size)
- @staticmethod
- async def patch_user_inputs(
- db: AsyncSession,
- order_id: str,
- payload: VasOrderPatchUserInputs,
- ) -> VasOrder:
- stmt = select(VasOrder).where(VasOrder.id == order_id)
- order = (await db.execute(stmt)).scalar_one_or_none()
- if not order:
- raise NotFoundError("Order not exist")
- order.user_inputs = payload.user_inputs
-
- # 1️⃣ 取消旧任务
- task_res = await db.execute(
- select(VasTask).where(
- VasTask.order_id == order.id,
- VasTask.status.in_(["pending", "grabbed", "running", "completed"]),
- )
- )
- for task in task_res.scalars().all():
- task.status = "cancelled"
-
- await db.flush()
- await WebhookService._create_task_if_not_exists(db, order)
-
-
- await db.commit()
- await db.refresh(order)
- return order
|