from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from app.core.biz_exception import NotFoundError from app.models.http_session import HttpSession from app.schemas.http_session import HttpSessionCreate, HttpSessionUpdate class HttpSessionService: # ============================ # 创建 Session # ============================ @staticmethod async def create(db: AsyncSession, data: HttpSessionCreate) -> HttpSession: obj = HttpSession(**data.dict()) db.add(obj) await db.commit() await db.refresh(obj) return obj # ============================ # 根据 session_id 获取 # ============================ @staticmethod async def get_by_sid( db: AsyncSession, session_id: str ) -> HttpSession: stmt = select(HttpSession).where(HttpSession.session_id == session_id) result = await db.execute(stmt) obj = result.scalar_one_or_none() if not obj: raise NotFoundError("Session not found") return obj # ============================ # 根据 session_id 删除 # ============================ @staticmethod async def delete_by_sid( db: AsyncSession, session_id: str ) -> bool: stmt = delete(HttpSession).where(HttpSession.session_id == session_id) result = await db.execute(stmt) if result.rowcount == 0: raise NotFoundError("Session not found") await db.commit() return True # ============================ # 根据 session_id 更新 # ============================ @staticmethod async def update_by_sid( db: AsyncSession, session_id: str, data: HttpSessionUpdate ) -> HttpSession: stmt = select(HttpSession).where(HttpSession.session_id == session_id) result = await db.execute(stmt) obj = result.scalar_one_or_none() if not obj: raise NotFoundError("Session not found") for field, value in data.dict(exclude_unset=True).items(): setattr(obj, field, value) await db.commit() await db.refresh(obj) return obj