auth.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from enum import IntEnum
  2. from fastapi import Depends, HTTPException, status
  3. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  4. from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError
  5. from app.core.config import settings
  6. from sqlalchemy.orm import Session
  7. from app.core.database import get_db
  8. from app.services.session_service import SessionService
  9. security = HTTPBearer()
  10. class RoleLevel(IntEnum):
  11. user = 10
  12. admin = 100
  13. ROLE_LEVEL_MAP = {
  14. "user": 10,
  15. "admin": 100,
  16. }
  17. def require_min_role(min_role: RoleLevel):
  18. def checker(user=Depends(get_current_user)):
  19. current_level = ROLE_LEVEL_MAP.get(user.role, 0)
  20. if current_level < min_role:
  21. raise PermissionDeniedError("Permission denied")
  22. return user
  23. return checker
  24. def get_current_user(
  25. credentials: HTTPAuthorizationCredentials = Depends(security),
  26. db: Session = Depends(get_db)
  27. ):
  28. token = credentials.credentials
  29. user = SessionService().get_user_by_token(db, token)
  30. if not user:
  31. raise PermissionDeniedError("Invalid or expired token")
  32. return user