| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- # app/services/schema_service.py
- from typing import List
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy import select
- from app.core.biz_exception import NotFoundError
- from app.models.schema import VasSchema
- from app.schemas.schema import VasSchemaCreate, VasSchemaUpdate
- class SchemaService:
- @staticmethod
- async def create(
- db: AsyncSession,
- data: VasSchemaCreate,
- ) -> VasSchema:
- rec = VasSchema(**data.dict())
- db.add(rec)
- await db.commit()
- await db.refresh(rec)
- return rec
- @staticmethod
- async def get(
- db: AsyncSession,
- id: int,
- ) -> VasSchema:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- return obj
- @staticmethod
- async def update(
- db: AsyncSession,
- id: int,
- data: VasSchemaUpdate,
- ) -> VasSchema:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- for k, v in data.dict(exclude_unset=True).items():
- setattr(obj, k, v)
- await db.commit()
- await db.refresh(obj)
- return obj
- @staticmethod
- async def delete(
- db: AsyncSession,
- id: int,
- ) -> None:
- stmt = select(VasSchema).where(VasSchema.id == id)
- obj = (await db.execute(stmt)).scalar_one_or_none()
- if not obj:
- raise NotFoundError("Schema not exist")
- await db.delete(obj)
- await db.commit()
- @staticmethod
- async def list_all(
- db: AsyncSession,
- ) -> List[VasSchema]:
- stmt = select(VasSchema)
- result = await db.execute(stmt)
- return result.scalars().all()
|