# app/services/schema_service.py from typing import List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.biz_exception import NotFoundError from app.models.schema import VasSchema from app.schemas.schema import VasSchemaCreate, VasSchemaUpdate class SchemaService: @staticmethod async def create( db: AsyncSession, data: VasSchemaCreate, ) -> VasSchema: rec = VasSchema(**data.model_dump(by_alias=True)) db.add(rec) await db.commit() await db.refresh(rec) return rec @staticmethod async def get( db: AsyncSession, id: int, ) -> VasSchema: stmt = select(VasSchema).where(VasSchema.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Schema not exist") return obj @staticmethod async def update( db: AsyncSession, id: int, data: VasSchemaUpdate, ) -> VasSchema: stmt = select(VasSchema).where(VasSchema.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Schema not exist") update_data = data.model_dump(exclude_unset=True, by_alias=True) for k, v in update_data.items(): setattr(obj, k, v) await db.commit() await db.refresh(obj) return obj @staticmethod async def delete( db: AsyncSession, id: int, ) -> None: stmt = select(VasSchema).where(VasSchema.id == id) obj = (await db.execute(stmt)).scalar_one_or_none() if not obj: raise NotFoundError("Schema not exist") await db.delete(obj) await db.commit() @staticmethod async def list_all( db: AsyncSession, ) -> List[VasSchema]: stmt = select(VasSchema) result = await db.execute(stmt) return result.scalars().all()