# app/services/product_service.py from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.utils.search import apply_keyword_search_stmt from app.utils.pagination import paginate from app.core.biz_exception import NotFoundError from app.models.product import VasProduct from app.schemas.product import VasProductCreate, VasProductUpdate class ProductService: @staticmethod async def create( db: AsyncSession, data: VasProductCreate, ) -> VasProduct: rec = VasProduct(**data.dict()) db.add(rec) await db.commit() await db.refresh(rec) return rec @staticmethod async def get( db: AsyncSession, id: int, ) -> VasProduct: stmt = select(VasProduct).where(VasProduct.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Product not exist") return obj @staticmethod async def update( db: AsyncSession, id: int, data: VasProductUpdate, ) -> VasProduct: stmt = select(VasProduct).where(VasProduct.id == id) rec = (await db.execute(stmt)).scalar_one_or_none() if not rec: raise NotFoundError("Product not exist") for k, v in data.dict(exclude_unset=True).items(): setattr(rec, k, v) await db.commit() await db.refresh(rec) return rec @staticmethod async def list_product( db: AsyncSession, country: str = None, visa_type: str = None, page: int = 0, size: int = 10, keyword: str = None, ): # ⚠️ paginate / apply_keyword_search 仍然基于 Query # 如果你当前 paginate 是同步实现,这里保持与你原项目一致 stmt = select(VasProduct) if country: stmt = stmt.where(VasProduct.country == country) if visa_type: stmt = stmt.where(VasProduct.visa_type == visa_type) stmt = apply_keyword_search_stmt( stmt=stmt, model=VasProduct, keyword=keyword, fields=["title", "provider", "description"], ) return await paginate(db, stmt, page, size)