diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index 1748191343..286857eb06 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -58,6 +58,9 @@ class LitellmUserRoles(str, enum.Enum): def __str__(self): return str(self.value) + def values(self) -> List[str]: + return list(self.__annotations__.keys()) + @property def description(self): """ diff --git a/litellm/proxy/auth/user_api_key_auth.py b/litellm/proxy/auth/user_api_key_auth.py index dc255cb205..b8be226050 100644 --- a/litellm/proxy/auth/user_api_key_auth.py +++ b/litellm/proxy/auth/user_api_key_auth.py @@ -1164,7 +1164,7 @@ async def user_api_key_auth( # No token was found when looking up in the DB raise Exception("Invalid proxy server token passed") if valid_token_dict is not None: - retrieved_user_role: Optional[str] = _get_user_role( + retrieved_user_role = _get_user_role( user_id_information=user_id_information ) if user_id_information is not None and _is_user_proxy_admin( @@ -1179,14 +1179,14 @@ async def user_api_key_auth( elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value: return UserAPIKeyAuth( api_key=api_key, - user_role=retrieved_user_role or LitellmUserRoles.INTERNAL_USER, # type: ignore + user_role=retrieved_user_role, parent_otel_span=parent_otel_span, **valid_token_dict, ) else: return UserAPIKeyAuth( api_key=api_key, - user_role=retrieved_user_role or LitellmUserRoles.INTERNAL_USER, # type: ignore + user_role=retrieved_user_role, parent_otel_span=parent_otel_span, **valid_token_dict, ) @@ -1230,6 +1230,37 @@ async def user_api_key_auth( ) +def _return_user_api_key_auth_obj( + user_id_information: Optional[list], + api_key: str, + parent_otel_span: Optional[Span], + valid_token_dict: dict, + route: str, +) -> UserAPIKeyAuth: + retrieved_user_role = _get_user_role(user_id_information=user_id_information) + if user_id_information is not None and _is_user_proxy_admin(user_id_information): + return UserAPIKeyAuth( + api_key=api_key, + user_role=LitellmUserRoles.PROXY_ADMIN, + parent_otel_span=parent_otel_span, + **valid_token_dict, + ) + elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value: + return UserAPIKeyAuth( + api_key=api_key, + user_role=retrieved_user_role, + parent_otel_span=parent_otel_span, + **valid_token_dict, + ) + else: + return UserAPIKeyAuth( + api_key=api_key, + user_role=retrieved_user_role, + parent_otel_span=parent_otel_span, + **valid_token_dict, + ) + + def _has_user_setup_sso(): """ Check if the user has set up single sign-on (SSO) by verifying the presence of Microsoft client ID, Google client ID, and UI username environment variables. @@ -1278,15 +1309,28 @@ def _is_user_proxy_admin(user_id_information: Optional[list]): return False -def _get_user_role(user_id_information: Optional[list]) -> Optional[str]: +def _get_user_role( + user_id_information: Optional[list], +) -> Literal[ + LitellmUserRoles.PROXY_ADMIN, + LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, + LitellmUserRoles.INTERNAL_USER, + LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, + LitellmUserRoles.TEAM, + LitellmUserRoles.CUSTOMER, +]: if user_id_information is None: - return None + return LitellmUserRoles.INTERNAL_USER if len(user_id_information) == 0 or user_id_information[0] is None: - return None + return LitellmUserRoles.INTERNAL_USER _user = user_id_information[0] - return _user.get("user_role") + + _user_role = _user.get("user_role") + if _user_role in list(LitellmUserRoles.__annotations__.keys()): + return _user_role + return LitellmUserRoles.INTERNAL_USER def _check_valid_ip(allowed_ips: Optional[List[str]], request: Request) -> bool: diff --git a/litellm/tests/test_user_api_key_auth.py b/litellm/tests/test_user_api_key_auth.py index 6ad0a62e79..1ba81d4fa4 100644 --- a/litellm/tests/test_user_api_key_auth.py +++ b/litellm/tests/test_user_api_key_auth.py @@ -93,3 +93,26 @@ async def test_check_blocked_team(): request._url = URL(url="/chat/completions") await user_api_key_auth(request=request, api_key="Bearer " + user_key) + + +@pytest.mark.parametrize( + "user_role", ["app_user", "internal_user", "proxy_admin_viewer"] +) +def test_returned_user_api_key_auth(user_role): + from litellm.proxy._types import LitellmUserRoles + from litellm.proxy.auth.user_api_key_auth import _return_user_api_key_auth_obj + + user_id_information = [{"user_role": user_role}] + + new_obj = _return_user_api_key_auth_obj( + user_id_information, + api_key="hello-world", + parent_otel_span=None, + valid_token_dict={}, + route="/chat/completion", + ) + + if user_role in list(LitellmUserRoles.__annotations__.keys()): + assert new_obj.user_role == user_role + else: + assert new_obj.user_role == "internal_user"