from enum import IntEnum from fastapi import Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.core.biz_exception import PermissionDeniedError from app.services.session_service import SessionService security = HTTPBearer(auto_error=False) class RoleLevel(IntEnum): user = 10 admin = 100 ROLE_LEVEL_MAP = { "user": 10, "admin": 100, } async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db), ): if not credentials: raise PermissionDeniedError("Missing token") token = credentials.credentials user = await SessionService().get_user_by_token(db, token) if not user: raise PermissionDeniedError("Invalid or expired token") return user def require_min_role(min_role: RoleLevel): async def checker(user=Depends(get_current_user)): current_level = ROLE_LEVEL_MAP.get(user.role, 0) if current_level < min_role: raise PermissionDeniedError("Permission denied") return user return checker