configuration_service.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # app/services/configuration_service.py
  2. from typing import List, Optional
  3. from sqlalchemy import select
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from app.core.biz_exception import NotFoundError, BizLogicError
  6. from app.models.configuration import Configuration
  7. from app.schemas.configuration import ConfigurationCreate, ConfigurationUpdate
  8. class ConfigurationService:
  9. # =========================
  10. # 创建配置
  11. # =========================
  12. @staticmethod
  13. async def create(
  14. db: AsyncSession,
  15. config_in: ConfigurationCreate,
  16. ) -> Configuration:
  17. stmt = select(Configuration).where(
  18. Configuration.config_key == config_in.config_key
  19. )
  20. existing = (await db.execute(stmt)).scalar_one_or_none()
  21. if existing:
  22. raise BizLogicError(
  23. f"Config Key '{config_in.config_key}' already exist"
  24. )
  25. db_obj = Configuration(**config_in.dict())
  26. db.add(db_obj)
  27. await db.commit()
  28. await db.refresh(db_obj)
  29. return db_obj
  30. # =========================
  31. # 获取全部配置
  32. # =========================
  33. @staticmethod
  34. async def get_all(
  35. db: AsyncSession,
  36. ) -> List[Configuration]:
  37. stmt = select(Configuration).order_by(Configuration.id.desc())
  38. result = await db.execute(stmt)
  39. return result.scalars().all()
  40. # =========================
  41. # 根据 key 获取配置
  42. # =========================
  43. @staticmethod
  44. async def get_by_key(
  45. db: AsyncSession,
  46. config_key: str,
  47. ) -> Configuration:
  48. stmt = select(Configuration).where(
  49. Configuration.config_key == config_key
  50. )
  51. config = (await db.execute(stmt)).scalar_one_or_none()
  52. if not config:
  53. raise NotFoundError(
  54. f"Config Key '{config_key}' not exist"
  55. )
  56. return config
  57. # =========================
  58. # 根据 key 更新配置
  59. # =========================
  60. @staticmethod
  61. async def update_by_key(
  62. db: AsyncSession,
  63. config_key: str,
  64. config_in: ConfigurationUpdate,
  65. ) -> Configuration:
  66. stmt = select(Configuration).where(
  67. Configuration.config_key == config_key
  68. )
  69. db_obj = (await db.execute(stmt)).scalar_one_or_none()
  70. if not db_obj:
  71. raise NotFoundError(
  72. f"Config Key '{config_key}' not exist"
  73. )
  74. for field, value in config_in.dict(exclude_unset=True).items():
  75. setattr(db_obj, field, value)
  76. await db.commit()
  77. await db.refresh(db_obj)
  78. return db_obj
  79. # =========================
  80. # 根据 key 删除配置
  81. # =========================
  82. @staticmethod
  83. async def delete_by_key(
  84. db: AsyncSession,
  85. config_key: str,
  86. ) -> Configuration:
  87. stmt = select(Configuration).where(
  88. Configuration.config_key == config_key
  89. )
  90. db_obj = (await db.execute(stmt)).scalar_one_or_none()
  91. if not db_obj:
  92. raise NotFoundError(
  93. f"Config Key '{config_key}' not exist"
  94. )
  95. await db.delete(db_obj)
  96. await db.commit()
  97. return db_obj