| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- # app/services/product_service.py
- from typing import Optional
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.utils.search import apply_keyword_search_stmt
- from app.utils.pagination import paginate
- from app.core.biz_exception import NotFoundError
- from app.models.product import VasProduct
- from app.schemas.product import VasProductCreate, VasProductUpdate
- class ProductService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: VasProductCreate,
- ) -> VasProduct:
- rec = VasProduct(**data.dict())
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def get(
- db: AsyncSession,
- id: int,
- ) -> VasProduct:
- stmt = select(VasProduct).where(VasProduct.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Product not exist")
- return obj
- @staticmethod
- async def update(
- db: AsyncSession,
- id: int,
- data: VasProductUpdate,
- ) -> VasProduct:
- stmt = select(VasProduct).where(VasProduct.id == id)
- rec = (await db.execute(stmt)).scalar_one_or_none()
- if not rec:
- raise NotFoundError("Product not exist")
- for k, v in data.dict(exclude_unset=True).items():
- setattr(rec, k, v)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def list_enable_product(
- db: AsyncSession,
- country: str = None,
- visa_type: str = None,
- page: int = 0,
- size: int = 10,
- keyword: str = None,
- ):
- # ⚠️ paginate / apply_keyword_search 仍然基于 Query
- # 如果你当前 paginate 是同步实现,这里保持与你原项目一致
- stmt = select(VasProduct)
-
- stmt = stmt.where(VasProduct.enabled == 1)
- if country:
- stmt = stmt.where(VasProduct.country == country)
- if visa_type:
- stmt = stmt.where(VasProduct.visa_type == visa_type)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasProduct,
- keyword=keyword,
- fields=["title", "provider", "description", "country", "city"],
- )
- stmt = stmt.order_by(
- VasProduct.recommend_score.desc(),
- VasProduct.created_at.desc(),
- )
- return await paginate(db, stmt, page, size)
-
- @staticmethod
- async def list_product(
- db: AsyncSession,
- country: str = None,
- visa_type: str = None,
- page: int = 0,
- size: int = 10,
- keyword: str = None,
- ):
- # ⚠️ paginate / apply_keyword_search 仍然基于 Query
- # 如果你当前 paginate 是同步实现,这里保持与你原项目一致
- stmt = select(VasProduct)
- if country:
- stmt = stmt.where(VasProduct.country == country)
- if visa_type:
- stmt = stmt.where(VasProduct.visa_type == visa_type)
- stmt = apply_keyword_search_stmt(
- stmt=stmt,
- model=VasProduct,
- keyword=keyword,
- fields=["title", "provider", "description", "country", "city"],
- )
- stmt = stmt.order_by(
- VasProduct.recommend_score.desc(),
- VasProduct.created_at.desc(),
- )
- return await paginate(db, stmt, page, size)
|