From 7c3e0d63ff213c1cc5b0838b5d483c2f17b5e2fb Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 15 Nov 2024 17:35:04 -0800 Subject: [PATCH] test_user_role_permissions --- litellm/proxy/_types.py | 2 + litellm/proxy/auth/route_checks.py | 43 +++++++-- .../test_role_based_access.py | 91 +++++++++++++++++++ 3 files changed, 127 insertions(+), 9 deletions(-) diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index dcc87c777..70e5e6756 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -278,6 +278,7 @@ class LiteLLMRoutes(enum.Enum): management_routes = [ # key "/key/generate", + "/key/{token_id}/regenerate", "/key/update", "/key/delete", "/key/info", @@ -360,6 +361,7 @@ class LiteLLMRoutes(enum.Enum): internal_user_routes = ( [ "/key/generate", + "/key/{token_id}/regenerate", "/key/update", "/key/delete", "/key/health", diff --git a/litellm/proxy/auth/route_checks.py b/litellm/proxy/auth/route_checks.py index 1b593162c..c75c1e66c 100644 --- a/litellm/proxy/auth/route_checks.py +++ b/litellm/proxy/auth/route_checks.py @@ -1,5 +1,5 @@ import re -from typing import Optional +from typing import List, Optional from fastapi import HTTPException, Request, status @@ -80,7 +80,9 @@ class RouteChecks: status_code=status.HTTP_403_FORBIDDEN, detail=f"user not allowed to access this OpenAI routes, role= {_user_role}", ) - if route in LiteLLMRoutes.management_routes.value: + if RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.management_routes.value + ): # the Admin Viewer is only allowed to call /user/update for their own user_id and can only update if route == "/user/update": @@ -101,21 +103,27 @@ class RouteChecks: elif ( _user_role == LitellmUserRoles.INTERNAL_USER.value - and route in LiteLLMRoutes.internal_user_routes.value + and RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.internal_user_routes.value + ) ): pass - elif ( - _user_is_org_admin(request_data=request_data, user_object=user_obj) - and route in LiteLLMRoutes.org_admin_allowed_routes.value + elif _user_is_org_admin( + request_data=request_data, user_object=user_obj + ) and RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.org_admin_allowed_routes.value ): pass elif ( _user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value - and route in LiteLLMRoutes.internal_user_view_only_routes.value + and RouteChecks.check_route_access( + route=route, + allowed_routes=LiteLLMRoutes.internal_user_view_only_routes.value, + ) ): pass - elif ( - route in LiteLLMRoutes.self_managed_routes.value + elif RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.self_managed_routes.value ): # routes that manage their own allowed/disallowed logic pass else: @@ -207,3 +215,20 @@ class RouteChecks: if re.match(pattern, route): return True return False + + @staticmethod + def check_route_access(route: str, allowed_routes: List[str]) -> bool: + """ + Check if a route has access by checking both exact matches and patterns + + Args: + route (str): The route to check + allowed_routes (list): List of allowed routes/patterns + + Returns: + bool: True if route is allowed, False otherwise + """ + return route in allowed_routes or any( # Check exact match + RouteChecks._route_matches_pattern(route=route, pattern=allowed_route) + for allowed_route in allowed_routes + ) # Check pattern match diff --git a/tests/proxy_admin_ui_tests/test_role_based_access.py b/tests/proxy_admin_ui_tests/test_role_based_access.py index e2727e5d8..6f59fd6f5 100644 --- a/tests/proxy_admin_ui_tests/test_role_based_access.py +++ b/tests/proxy_admin_ui_tests/test_role_based_access.py @@ -437,3 +437,94 @@ async def test_org_admin_create_user_team_wrong_org_permissions(prisma_client): "You do not have the required role to call" in e.message and org2_id in e.message ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "route, user_role, expected_result", + [ + # Proxy Admin checks + ("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/delete", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/generate", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/regenerate", LitellmUserRoles.PROXY_ADMIN, True), + # # Internal User checks - allowed routes + ("/global/spend/logs", LitellmUserRoles.INTERNAL_USER, True), + ("/key/delete", LitellmUserRoles.INTERNAL_USER, True), + ("/key/generate", LitellmUserRoles.INTERNAL_USER, True), + ("/key/82akk800000000jjsk/regenerate", LitellmUserRoles.INTERNAL_USER, True), + # Internal User Viewer + ("/key/generate", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ( + "/key/82akk800000000jjsk/regenerate", + LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, + False, + ), + ("/key/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/new", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/update", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + # Proxy Admin Viewer + ("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, True), + ("/key/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/key/generate", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ( + "/key/82akk800000000jjsk/regenerate", + LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, + False, + ), + ("/team/new", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/team/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/team/update", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + # Internal User checks - disallowed routes + ("/organization/member_add", LitellmUserRoles.INTERNAL_USER, False), + ], +) +async def test_user_role_permissions(prisma_client, route, user_role, expected_result): + """Test user role based permissions for different routes""" + try: + # Setup + setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client) + setattr(litellm.proxy.proxy_server, "master_key", "sk-1234") + await litellm.proxy.proxy_server.prisma_client.connect() + + # Admin - admin creates a new user + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.PROXY_ADMIN, + api_key="sk-1234", + user_id="1234", + ) + + request = NewUserRequest(user_role=user_role) + new_user_response = await new_user(request, user_api_key_dict=user_api_key_dict) + user_id = new_user_response.user_id + + # Generate key for new user with team_id="litellm-dashboard" + key_response = await generate_key_fn( + data=GenerateKeyRequest(user_id=user_id, team_id="litellm-dashboard"), + user_api_key_dict=user_api_key_dict, + ) + generated_key = key_response.key + bearer_token = "Bearer " + generated_key + + # Create request with route + request = Request(scope={"type": "http"}) + request._url = URL(url=route) + + # Test authorization + if expected_result is True: + # Should pass without error + result = await user_api_key_auth(request=request, api_key=bearer_token) + print(f"Auth passed as expected for {route} with role {user_role}") + else: + # Should raise an error + with pytest.raises(Exception) as exc_info: + await user_api_key_auth(request=request, api_key=bearer_token) + print(f"Auth failed as expected for {route} with role {user_role}") + print(f"Error message: {str(exc_info.value)}") + + except Exception as e: + if expected_result: + pytest.fail(f"Expected success but got exception: {str(e)}") + else: + print(f"Got expected exception: {str(e)}")