diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index 4baf13b61a..70e5e6756c 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -278,6 +278,7 @@ class LiteLLMRoutes(enum.Enum): management_routes = [ # key "/key/generate", + "/key/{token_id}/regenerate", "/key/update", "/key/delete", "/key/info", @@ -339,11 +340,7 @@ class LiteLLMRoutes(enum.Enum): "/sso", "/sso/get/ui_settings", "/login", - "/key/generate", - "/key/{token_id}/regenerate", - "/key/update", "/key/info", - "/key/delete", "/config", "/spend", "/user", @@ -364,6 +361,7 @@ class LiteLLMRoutes(enum.Enum): internal_user_routes = ( [ "/key/generate", + "/key/{token_id}/regenerate", "/key/update", "/key/delete", "/key/health", diff --git a/litellm/proxy/auth/route_checks.py b/litellm/proxy/auth/route_checks.py index 1b593162c8..c75c1e66ca 100644 --- a/litellm/proxy/auth/route_checks.py +++ b/litellm/proxy/auth/route_checks.py @@ -1,5 +1,5 @@ import re -from typing import Optional +from typing import List, Optional from fastapi import HTTPException, Request, status @@ -80,7 +80,9 @@ class RouteChecks: status_code=status.HTTP_403_FORBIDDEN, detail=f"user not allowed to access this OpenAI routes, role= {_user_role}", ) - if route in LiteLLMRoutes.management_routes.value: + if RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.management_routes.value + ): # the Admin Viewer is only allowed to call /user/update for their own user_id and can only update if route == "/user/update": @@ -101,21 +103,27 @@ class RouteChecks: elif ( _user_role == LitellmUserRoles.INTERNAL_USER.value - and route in LiteLLMRoutes.internal_user_routes.value + and RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.internal_user_routes.value + ) ): pass - elif ( - _user_is_org_admin(request_data=request_data, user_object=user_obj) - and route in LiteLLMRoutes.org_admin_allowed_routes.value + elif _user_is_org_admin( + request_data=request_data, user_object=user_obj + ) and RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.org_admin_allowed_routes.value ): pass elif ( _user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value - and route in LiteLLMRoutes.internal_user_view_only_routes.value + and RouteChecks.check_route_access( + route=route, + allowed_routes=LiteLLMRoutes.internal_user_view_only_routes.value, + ) ): pass - elif ( - route in LiteLLMRoutes.self_managed_routes.value + elif RouteChecks.check_route_access( + route=route, allowed_routes=LiteLLMRoutes.self_managed_routes.value ): # routes that manage their own allowed/disallowed logic pass else: @@ -207,3 +215,20 @@ class RouteChecks: if re.match(pattern, route): return True return False + + @staticmethod + def check_route_access(route: str, allowed_routes: List[str]) -> bool: + """ + Check if a route has access by checking both exact matches and patterns + + Args: + route (str): The route to check + allowed_routes (list): List of allowed routes/patterns + + Returns: + bool: True if route is allowed, False otherwise + """ + return route in allowed_routes or any( # Check exact match + RouteChecks._route_matches_pattern(route=route, pattern=allowed_route) + for allowed_route in allowed_routes + ) # Check pattern match diff --git a/litellm/proxy/auth/user_api_key_auth.py b/litellm/proxy/auth/user_api_key_auth.py index 6032a72af1..669661e947 100644 --- a/litellm/proxy/auth/user_api_key_auth.py +++ b/litellm/proxy/auth/user_api_key_auth.py @@ -111,12 +111,12 @@ def _get_bearer_token( return api_key -def _is_ui_route_allowed( +def _is_ui_route( route: str, user_obj: Optional[LiteLLM_UserTable] = None, ) -> bool: """ - - Route b/w ui token check and normal token check + - Check if the route is a UI used route """ # this token is only used for managing the ui allowed_routes = LiteLLMRoutes.ui_routes.value @@ -133,15 +133,7 @@ def _is_ui_route_allowed( for allowed_route in allowed_routes ): return True - else: - if user_obj is not None and _is_user_proxy_admin(user_obj=user_obj): - return True - elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value: - return True - else: - raise Exception( - f"This key is made for LiteLLM UI, Tried to access route: {route}. Not allowed" - ) + return False def _is_api_route_allowed( @@ -185,8 +177,8 @@ def _is_allowed_route( """ - Route b/w ui token check and normal token check """ - if token_type == "ui": - return _is_ui_route_allowed(route=route, user_obj=user_obj) + if token_type == "ui" and _is_ui_route(route=route, user_obj=user_obj): + return True else: return _is_api_route_allowed( route=route, diff --git a/tests/local_testing/test_user_api_key_auth.py b/tests/local_testing/test_user_api_key_auth.py index f6becf070e..31daa358af 100644 --- a/tests/local_testing/test_user_api_key_auth.py +++ b/tests/local_testing/test_user_api_key_auth.py @@ -305,14 +305,14 @@ async def test_auth_with_allowed_routes(route, should_raise_error): [ # Proxy Admin checks ("/global/spend/logs", "proxy_admin", True), - ("/key/delete", "proxy_admin", True), - ("/key/generate", "proxy_admin", True), - ("/key/regenerate", "proxy_admin", True), + ("/key/delete", "proxy_admin", False), + ("/key/generate", "proxy_admin", False), + ("/key/regenerate", "proxy_admin", False), # Internal User checks - allowed routes ("/global/spend/logs", "internal_user", True), - ("/key/delete", "internal_user", True), - ("/key/generate", "internal_user", True), - ("/key/82akk800000000jjsk/regenerate", "internal_user", True), + ("/key/delete", "internal_user", False), + ("/key/generate", "internal_user", False), + ("/key/82akk800000000jjsk/regenerate", "internal_user", False), # Internal User Viewer ("/key/generate", "internal_user_viewer", False), # Internal User checks - disallowed routes @@ -320,7 +320,7 @@ async def test_auth_with_allowed_routes(route, should_raise_error): ], ) def test_is_ui_route_allowed(route, user_role, expected_result): - from litellm.proxy.auth.user_api_key_auth import _is_ui_route_allowed + from litellm.proxy.auth.user_api_key_auth import _is_ui_route from litellm.proxy._types import LiteLLM_UserTable user_obj = LiteLLM_UserTable( @@ -342,7 +342,7 @@ def test_is_ui_route_allowed(route, user_role, expected_result): "user_obj": user_obj, } try: - assert _is_ui_route_allowed(**received_args) == expected_result + assert _is_ui_route(**received_args) == expected_result except Exception as e: # If expected result is False, we expect an error if expected_result is False: diff --git a/tests/proxy_admin_ui_tests/test_role_based_access.py b/tests/proxy_admin_ui_tests/test_role_based_access.py index e2727e5d8c..6f59fd6f58 100644 --- a/tests/proxy_admin_ui_tests/test_role_based_access.py +++ b/tests/proxy_admin_ui_tests/test_role_based_access.py @@ -437,3 +437,94 @@ async def test_org_admin_create_user_team_wrong_org_permissions(prisma_client): "You do not have the required role to call" in e.message and org2_id in e.message ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "route, user_role, expected_result", + [ + # Proxy Admin checks + ("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/delete", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/generate", LitellmUserRoles.PROXY_ADMIN, True), + ("/key/regenerate", LitellmUserRoles.PROXY_ADMIN, True), + # # Internal User checks - allowed routes + ("/global/spend/logs", LitellmUserRoles.INTERNAL_USER, True), + ("/key/delete", LitellmUserRoles.INTERNAL_USER, True), + ("/key/generate", LitellmUserRoles.INTERNAL_USER, True), + ("/key/82akk800000000jjsk/regenerate", LitellmUserRoles.INTERNAL_USER, True), + # Internal User Viewer + ("/key/generate", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ( + "/key/82akk800000000jjsk/regenerate", + LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, + False, + ), + ("/key/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/new", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/delete", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + ("/team/update", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, False), + # Proxy Admin Viewer + ("/global/spend/logs", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, True), + ("/key/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/key/generate", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ( + "/key/82akk800000000jjsk/regenerate", + LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, + False, + ), + ("/team/new", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/team/delete", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + ("/team/update", LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, False), + # Internal User checks - disallowed routes + ("/organization/member_add", LitellmUserRoles.INTERNAL_USER, False), + ], +) +async def test_user_role_permissions(prisma_client, route, user_role, expected_result): + """Test user role based permissions for different routes""" + try: + # Setup + setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client) + setattr(litellm.proxy.proxy_server, "master_key", "sk-1234") + await litellm.proxy.proxy_server.prisma_client.connect() + + # Admin - admin creates a new user + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.PROXY_ADMIN, + api_key="sk-1234", + user_id="1234", + ) + + request = NewUserRequest(user_role=user_role) + new_user_response = await new_user(request, user_api_key_dict=user_api_key_dict) + user_id = new_user_response.user_id + + # Generate key for new user with team_id="litellm-dashboard" + key_response = await generate_key_fn( + data=GenerateKeyRequest(user_id=user_id, team_id="litellm-dashboard"), + user_api_key_dict=user_api_key_dict, + ) + generated_key = key_response.key + bearer_token = "Bearer " + generated_key + + # Create request with route + request = Request(scope={"type": "http"}) + request._url = URL(url=route) + + # Test authorization + if expected_result is True: + # Should pass without error + result = await user_api_key_auth(request=request, api_key=bearer_token) + print(f"Auth passed as expected for {route} with role {user_role}") + else: + # Should raise an error + with pytest.raises(Exception) as exc_info: + await user_api_key_auth(request=request, api_key=bearer_token) + print(f"Auth failed as expected for {route} with role {user_role}") + print(f"Error message: {str(exc_info.value)}") + + except Exception as e: + if expected_result: + pytest.fail(f"Expected success but got exception: {str(e)}") + else: + print(f"Got expected exception: {str(e)}") diff --git a/tests/proxy_unit_tests/test_user_api_key_auth.py b/tests/proxy_unit_tests/test_user_api_key_auth.py index f6becf070e..31daa358af 100644 --- a/tests/proxy_unit_tests/test_user_api_key_auth.py +++ b/tests/proxy_unit_tests/test_user_api_key_auth.py @@ -305,14 +305,14 @@ async def test_auth_with_allowed_routes(route, should_raise_error): [ # Proxy Admin checks ("/global/spend/logs", "proxy_admin", True), - ("/key/delete", "proxy_admin", True), - ("/key/generate", "proxy_admin", True), - ("/key/regenerate", "proxy_admin", True), + ("/key/delete", "proxy_admin", False), + ("/key/generate", "proxy_admin", False), + ("/key/regenerate", "proxy_admin", False), # Internal User checks - allowed routes ("/global/spend/logs", "internal_user", True), - ("/key/delete", "internal_user", True), - ("/key/generate", "internal_user", True), - ("/key/82akk800000000jjsk/regenerate", "internal_user", True), + ("/key/delete", "internal_user", False), + ("/key/generate", "internal_user", False), + ("/key/82akk800000000jjsk/regenerate", "internal_user", False), # Internal User Viewer ("/key/generate", "internal_user_viewer", False), # Internal User checks - disallowed routes @@ -320,7 +320,7 @@ async def test_auth_with_allowed_routes(route, should_raise_error): ], ) def test_is_ui_route_allowed(route, user_role, expected_result): - from litellm.proxy.auth.user_api_key_auth import _is_ui_route_allowed + from litellm.proxy.auth.user_api_key_auth import _is_ui_route from litellm.proxy._types import LiteLLM_UserTable user_obj = LiteLLM_UserTable( @@ -342,7 +342,7 @@ def test_is_ui_route_allowed(route, user_role, expected_result): "user_obj": user_obj, } try: - assert _is_ui_route_allowed(**received_args) == expected_result + assert _is_ui_route(**received_args) == expected_result except Exception as e: # If expected result is False, we expect an error if expected_result is False: