| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- # app/services/schema_service.py
- from typing import List
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.core.biz_exception import NotFoundError
- from app.models.schema import VasSchema
- from app.schemas.schema import VasSchemaCreate, VasSchemaUpdate
- class SchemaService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: VasSchemaCreate,
- ) -> VasSchema:
- rec = VasSchema(**data.model_dump(by_alias=True))
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def get(
- db: AsyncSession,
- id: int,
- ) -> VasSchema:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- return obj
- @staticmethod
- async def update(
- db: AsyncSession,
- id: int,
- data: VasSchemaUpdate,
- ) -> VasSchema:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
-
- update_data = data.model_dump(exclude_unset=True, by_alias=True)
- for k, v in update_data.items():
- setattr(obj, k, v)
- await db.commit()
- await db.refresh(obj)
- return obj
- @staticmethod
- async def delete(
- db: AsyncSession,
- id: int,
- ) -> None:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- await db.delete(obj)
- await db.commit()
- @staticmethod
- async def list_all(
- db: AsyncSession,
- ) -> List[VasSchema]:
- stmt = select(VasSchema)
- result = await db.execute(stmt)
- return result.scalars().all()
|