test_user_role_permissions

This commit is contained in:
Ishaan Jaff 2024-11-15 17:35:04 -08:00
parent d2c0b5cfed
commit 7c3e0d63ff
3 changed files with 127 additions and 9 deletions

View file

@ -278,6 +278,7 @@ class LiteLLMRoutes(enum.Enum):
management_routes = [ # key management_routes = [ # key
"/key/generate", "/key/generate",
"/key/{token_id}/regenerate",
"/key/update", "/key/update",
"/key/delete", "/key/delete",
"/key/info", "/key/info",
@ -360,6 +361,7 @@ class LiteLLMRoutes(enum.Enum):
internal_user_routes = ( internal_user_routes = (
[ [
"/key/generate", "/key/generate",
"/key/{token_id}/regenerate",
"/key/update", "/key/update",
"/key/delete", "/key/delete",
"/key/health", "/key/health",

View file

@ -1,5 +1,5 @@
import re import re
from typing import Optional from typing import List, Optional
from fastapi import HTTPException, Request, status from fastapi import HTTPException, Request, status
@ -80,7 +80,9 @@ class RouteChecks:
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this OpenAI routes, role= {_user_role}", detail=f"user not allowed to access this OpenAI routes, role= {_user_role}",
) )
if route in LiteLLMRoutes.management_routes.value: if RouteChecks.check_route_access(
route=route, allowed_routes=LiteLLMRoutes.management_routes.value
):
# the Admin Viewer is only allowed to call /user/update for their own user_id and can only update # the Admin Viewer is only allowed to call /user/update for their own user_id and can only update
if route == "/user/update": if route == "/user/update":
@ -101,21 +103,27 @@ class RouteChecks:
elif ( elif (
_user_role == LitellmUserRoles.INTERNAL_USER.value _user_role == LitellmUserRoles.INTERNAL_USER.value
and route in LiteLLMRoutes.internal_user_routes.value and RouteChecks.check_route_access(
route=route, allowed_routes=LiteLLMRoutes.internal_user_routes.value
)
): ):
pass pass
elif ( elif _user_is_org_admin(
_user_is_org_admin(request_data=request_data, user_object=user_obj) request_data=request_data, user_object=user_obj
and route in LiteLLMRoutes.org_admin_allowed_routes.value ) and RouteChecks.check_route_access(
route=route, allowed_routes=LiteLLMRoutes.org_admin_allowed_routes.value
): ):
pass pass
elif ( elif (
_user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value _user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value
and route in LiteLLMRoutes.internal_user_view_only_routes.value and RouteChecks.check_route_access(
route=route,
allowed_routes=LiteLLMRoutes.internal_user_view_only_routes.value,
)
): ):
pass pass
elif ( elif RouteChecks.check_route_access(
route in LiteLLMRoutes.self_managed_routes.value route=route, allowed_routes=LiteLLMRoutes.self_managed_routes.value
): # routes that manage their own allowed/disallowed logic ): # routes that manage their own allowed/disallowed logic
pass pass
else: else:
@ -207,3 +215,20 @@ class RouteChecks:
if re.match(pattern, route): if re.match(pattern, route):
return True return True
return False return False
@staticmethod
def check_route_access(route: str, allowed_routes: List[str]) -> bool:
"""
Check if a route has access by checking both exact matches and patterns
Args:
route (str): The route to check
allowed_routes (list): List of allowed routes/patterns
Returns:
bool: True if route is allowed, False otherwise
"""
return route in allowed_routes or any( # Check exact match
RouteChecks._route_matches_pattern(route=route, pattern=allowed_route)
for allowed_route in allowed_routes
) # Check pattern match

View file

@ -437,3 +437,94 @@ async def test_org_admin_create_user_team_wrong_org_permissions(prisma_client):
"You do not have the required role to call" in e.message "You do not have the required role to call" in e.message
and org2_id in e.message and org2_id in e.message
) )
@pytest.mark.asyncio
@pytest.mark.parametrize(
"route, user_role, expected_result",
[
# Proxy Admin checks
("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN, True),
("/key/delete", LitellmUserRoles.PROXY_ADMIN, True),
("/key/generate", LitellmUserRoles.PROXY_ADMIN, True),
("/key/regenerate", LitellmUserRoles.PROXY_ADMIN, True),
# # Internal User checks - allowed routes
("/global/spend/logs", LitellmUserRoles.INTERNAL_USER, True),
("/key/delete", LitellmUserRoles.INTERNAL_USER, True),
("/key/generate", LitellmUserRoles.INTERNAL_USER, True),
("/key/82akk800000000jjsk/regenerate", LitellmUserRoles.INTERNAL_USER, True),
# Internal User Viewer
("/key/generate", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False),
(
"/key/82akk800000000jjsk/regenerate",
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY,
False,
),
("/key/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False),
("/team/new", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False),
("/team/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False),
("/team/update", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False),
# Proxy Admin Viewer
("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, True),
("/key/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False),
("/key/generate", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False),
(
"/key/82akk800000000jjsk/regenerate",
LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY,
False,
),
("/team/new", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False),
("/team/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False),
("/team/update", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False),
# Internal User checks - disallowed routes
("/organization/member_add", LitellmUserRoles.INTERNAL_USER, False),
],
)
async def test_user_role_permissions(prisma_client, route, user_role, expected_result):
"""Test user role based permissions for different routes"""
try:
# Setup
setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client)
setattr(litellm.proxy.proxy_server, "master_key", "sk-1234")
await litellm.proxy.proxy_server.prisma_client.connect()
# Admin - admin creates a new user
user_api_key_dict = UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
)
request = NewUserRequest(user_role=user_role)
new_user_response = await new_user(request, user_api_key_dict=user_api_key_dict)
user_id = new_user_response.user_id
# Generate key for new user with team_id="litellm-dashboard"
key_response = await generate_key_fn(
data=GenerateKeyRequest(user_id=user_id, team_id="litellm-dashboard"),
user_api_key_dict=user_api_key_dict,
)
generated_key = key_response.key
bearer_token = "Bearer " + generated_key
# Create request with route
request = Request(scope={"type": "http"})
request._url = URL(url=route)
# Test authorization
if expected_result is True:
# Should pass without error
result = await user_api_key_auth(request=request, api_key=bearer_token)
print(f"Auth passed as expected for {route} with role {user_role}")
else:
# Should raise an error
with pytest.raises(Exception) as exc_info:
await user_api_key_auth(request=request, api_key=bearer_token)
print(f"Auth failed as expected for {route} with role {user_role}")
print(f"Error message: {str(exc_info.value)}")
except Exception as e:
if expected_result:
pytest.fail(f"Expected success but got exception: {str(e)}")
else:
print(f"Got expected exception: {str(e)}")