# app/services/product_service.py from sqlalchemy.orm import Session from typing import Optional, List from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.product import VasProduct from app.schemas.product import VasProductCreate, VasProductUpdate, VasProductOut class ProductService: def create(db: Session, data: VasProductCreate): rec = VasProduct(**data.dict()) db.add(rec) db.commit() db.refresh(rec) return rec def get(db: Session, id:int): obj = db.query(VasProduct).filter_by(id=id).first() if not obj: raise NotFoundError('Product not exist') return obj def update(db: Session, id:int, data: VasProductUpdate): rec = db.query(VasProduct).filter_by(id=id).first() if not rec: raise NotFoundError('Product not exist') for k,v in data.dict(exclude_unset=True).items(): setattr(rec,k,v) db.commit() db.refresh(rec) return rec