auth.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from enum import IntEnum
  2. from fastapi import Depends
  3. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from app.core.database import get_db
  6. from app.core.biz_exception import PermissionDeniedError
  7. from app.services.session_service import SessionService
  8. security = HTTPBearer(auto_error=False)
  9. class RoleLevel(IntEnum):
  10. user = 10
  11. admin = 100
  12. ROLE_LEVEL_MAP = {
  13. "user": 10,
  14. "admin": 100,
  15. }
  16. async def get_current_user(
  17. credentials: HTTPAuthorizationCredentials = Depends(security),
  18. db: AsyncSession = Depends(get_db),
  19. ):
  20. if not credentials:
  21. raise PermissionDeniedError("Missing token")
  22. token = credentials.credentials
  23. user = await SessionService().get_user_by_token(db, token)
  24. if not user:
  25. raise PermissionDeniedError("Invalid or expired token")
  26. return user
  27. def require_min_role(min_role: RoleLevel):
  28. async def checker(user=Depends(get_current_user)):
  29. current_level = ROLE_LEVEL_MAP.get(user.role, 0)
  30. if current_level < min_role:
  31. raise PermissionDeniedError("Permission denied")
  32. return user
  33. return checker