| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- from enum import IntEnum
- from fastapi import Depends
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.core.database import get_db
- from app.core.biz_exception import PermissionDeniedError
- from app.services.session_service import SessionService
- security = HTTPBearer(auto_error=False)
- class RoleLevel(IntEnum):
- user = 10
- admin = 100
- ROLE_LEVEL_MAP = {
- "user": 10,
- "admin": 100,
- }
- async def get_current_user(
- credentials: HTTPAuthorizationCredentials = Depends(security),
- db: AsyncSession = Depends(get_db),
- ):
- if not credentials:
- raise PermissionDeniedError("Missing token")
- token = credentials.credentials
- user = await SessionService().get_user_by_token(db, token)
- if not user:
- raise PermissionDeniedError("Invalid or expired token")
- return user
- def require_min_role(min_role: RoleLevel):
- async def checker(user=Depends(get_current_user)):
- current_level = ROLE_LEVEL_MAP.get(user.role, 0)
- if current_level < min_role:
- raise PermissionDeniedError("Permission denied")
- return user
- return checker
|