# app/services/configuration_service.py from typing import List, Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.biz_exception import NotFoundError, BizLogicError from app.models.configuration import Configuration from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate class ConfigurationService: # ========================= # 创建配置 # ========================= @staticmethod async def create( db: AsyncSession, config_in: ConfigurationCreate, ) -> Configuration: stmt = select(Configuration).where( Configuration.config_key == config_in.config_key ) existing = (await db.execute(stmt)).scalar_one_or_none() if existing: raise BizLogicError( f"Config Key '{config_in.config_key}' already exist" ) db_obj = Configuration(**config_in.dict()) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj # ========================= # 获取全部配置 # ========================= @staticmethod async def get_all( db: AsyncSession, ) -> List[Configuration]: stmt = select(Configuration).order_by(Configuration.id.desc()) result = await db.execute(stmt) return result.scalars().all() # ========================= # 根据 key 获取配置 # ========================= @staticmethod async def get_by_key( db: AsyncSession, config_key: str, ) -> Configuration: stmt = select(Configuration).where( Configuration.config_key == config_key ) config = (await db.execute(stmt)).scalar_one_or_none() if not config: raise NotFoundError( f"Config Key '{config_key}' not exist" ) return config # ========================= # 根据 key 更新配置 # ========================= @staticmethod async def update_by_key( db: AsyncSession, config_key: str, config_in: ConfigurationUpdate, ) -> Configuration: stmt = select(Configuration).where( Configuration.config_key == config_key ) db_obj = (await db.execute(stmt)).scalar_one_or_none() if not db_obj: raise NotFoundError( f"Config Key '{config_key}' not exist" ) for field, value in config_in.dict(exclude_unset=True).items(): setattr(db_obj, field, value) await db.commit() await db.refresh(db_obj) return db_obj # ========================= # 根据 key 删除配置 # ========================= @staticmethod async def delete_by_key( db: AsyncSession, config_key: str, ) -> Configuration: stmt = select(Configuration).where( Configuration.config_key == config_key ) db_obj = (await db.execute(stmt)).scalar_one_or_none() if not db_obj: raise NotFoundError( f"Config Key '{config_key}' not exist" ) await db.delete(db_obj) await db.commit() return db_obj