mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 19:24:27 +00:00
fix(user_api_key_auth.py): handle older user_role's
Fixes issue where older user_role's (e.g. app_user) weren't being recognized. + Adds testing for it
This commit is contained in:
parent
c97ecb84b1
commit
8fea55ffd7
3 changed files with 77 additions and 7 deletions
|
@ -58,6 +58,9 @@ class LitellmUserRoles(str, enum.Enum):
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self.value)
|
return str(self.value)
|
||||||
|
|
||||||
|
def values(self) -> List[str]:
|
||||||
|
return list(self.__annotations__.keys())
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def description(self):
|
def description(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1164,7 +1164,7 @@ async def user_api_key_auth(
|
||||||
# No token was found when looking up in the DB
|
# No token was found when looking up in the DB
|
||||||
raise Exception("Invalid proxy server token passed")
|
raise Exception("Invalid proxy server token passed")
|
||||||
if valid_token_dict is not None:
|
if valid_token_dict is not None:
|
||||||
retrieved_user_role: Optional[str] = _get_user_role(
|
retrieved_user_role = _get_user_role(
|
||||||
user_id_information=user_id_information
|
user_id_information=user_id_information
|
||||||
)
|
)
|
||||||
if user_id_information is not None and _is_user_proxy_admin(
|
if user_id_information is not None and _is_user_proxy_admin(
|
||||||
|
@ -1179,14 +1179,14 @@ async def user_api_key_auth(
|
||||||
elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value:
|
elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value:
|
||||||
return UserAPIKeyAuth(
|
return UserAPIKeyAuth(
|
||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
user_role=retrieved_user_role or LitellmUserRoles.INTERNAL_USER, # type: ignore
|
user_role=retrieved_user_role,
|
||||||
parent_otel_span=parent_otel_span,
|
parent_otel_span=parent_otel_span,
|
||||||
**valid_token_dict,
|
**valid_token_dict,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return UserAPIKeyAuth(
|
return UserAPIKeyAuth(
|
||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
user_role=retrieved_user_role or LitellmUserRoles.INTERNAL_USER, # type: ignore
|
user_role=retrieved_user_role,
|
||||||
parent_otel_span=parent_otel_span,
|
parent_otel_span=parent_otel_span,
|
||||||
**valid_token_dict,
|
**valid_token_dict,
|
||||||
)
|
)
|
||||||
|
@ -1230,6 +1230,37 @@ async def user_api_key_auth(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _return_user_api_key_auth_obj(
|
||||||
|
user_id_information: Optional[list],
|
||||||
|
api_key: str,
|
||||||
|
parent_otel_span: Optional[Span],
|
||||||
|
valid_token_dict: dict,
|
||||||
|
route: str,
|
||||||
|
) -> UserAPIKeyAuth:
|
||||||
|
retrieved_user_role = _get_user_role(user_id_information=user_id_information)
|
||||||
|
if user_id_information is not None and _is_user_proxy_admin(user_id_information):
|
||||||
|
return UserAPIKeyAuth(
|
||||||
|
api_key=api_key,
|
||||||
|
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||||
|
parent_otel_span=parent_otel_span,
|
||||||
|
**valid_token_dict,
|
||||||
|
)
|
||||||
|
elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value:
|
||||||
|
return UserAPIKeyAuth(
|
||||||
|
api_key=api_key,
|
||||||
|
user_role=retrieved_user_role,
|
||||||
|
parent_otel_span=parent_otel_span,
|
||||||
|
**valid_token_dict,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return UserAPIKeyAuth(
|
||||||
|
api_key=api_key,
|
||||||
|
user_role=retrieved_user_role,
|
||||||
|
parent_otel_span=parent_otel_span,
|
||||||
|
**valid_token_dict,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _has_user_setup_sso():
|
def _has_user_setup_sso():
|
||||||
"""
|
"""
|
||||||
Check if the user has set up single sign-on (SSO) by verifying the presence of Microsoft client ID, Google client ID, and UI username environment variables.
|
Check if the user has set up single sign-on (SSO) by verifying the presence of Microsoft client ID, Google client ID, and UI username environment variables.
|
||||||
|
@ -1278,15 +1309,28 @@ def _is_user_proxy_admin(user_id_information: Optional[list]):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _get_user_role(user_id_information: Optional[list]) -> Optional[str]:
|
def _get_user_role(
|
||||||
|
user_id_information: Optional[list],
|
||||||
|
) -> Literal[
|
||||||
|
LitellmUserRoles.PROXY_ADMIN,
|
||||||
|
LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY,
|
||||||
|
LitellmUserRoles.INTERNAL_USER,
|
||||||
|
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY,
|
||||||
|
LitellmUserRoles.TEAM,
|
||||||
|
LitellmUserRoles.CUSTOMER,
|
||||||
|
]:
|
||||||
if user_id_information is None:
|
if user_id_information is None:
|
||||||
return None
|
return LitellmUserRoles.INTERNAL_USER
|
||||||
|
|
||||||
if len(user_id_information) == 0 or user_id_information[0] is None:
|
if len(user_id_information) == 0 or user_id_information[0] is None:
|
||||||
return None
|
return LitellmUserRoles.INTERNAL_USER
|
||||||
|
|
||||||
_user = user_id_information[0]
|
_user = user_id_information[0]
|
||||||
return _user.get("user_role")
|
|
||||||
|
_user_role = _user.get("user_role")
|
||||||
|
if _user_role in list(LitellmUserRoles.__annotations__.keys()):
|
||||||
|
return _user_role
|
||||||
|
return LitellmUserRoles.INTERNAL_USER
|
||||||
|
|
||||||
|
|
||||||
def _check_valid_ip(allowed_ips: Optional[List[str]], request: Request) -> bool:
|
def _check_valid_ip(allowed_ips: Optional[List[str]], request: Request) -> bool:
|
||||||
|
|
|
@ -93,3 +93,26 @@ async def test_check_blocked_team():
|
||||||
request._url = URL(url="/chat/completions")
|
request._url = URL(url="/chat/completions")
|
||||||
|
|
||||||
await user_api_key_auth(request=request, api_key="Bearer " + user_key)
|
await user_api_key_auth(request=request, api_key="Bearer " + user_key)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"user_role", ["app_user", "internal_user", "proxy_admin_viewer"]
|
||||||
|
)
|
||||||
|
def test_returned_user_api_key_auth(user_role):
|
||||||
|
from litellm.proxy._types import LitellmUserRoles
|
||||||
|
from litellm.proxy.auth.user_api_key_auth import _return_user_api_key_auth_obj
|
||||||
|
|
||||||
|
user_id_information = [{"user_role": user_role}]
|
||||||
|
|
||||||
|
new_obj = _return_user_api_key_auth_obj(
|
||||||
|
user_id_information,
|
||||||
|
api_key="hello-world",
|
||||||
|
parent_otel_span=None,
|
||||||
|
valid_token_dict={},
|
||||||
|
route="/chat/completion",
|
||||||
|
)
|
||||||
|
|
||||||
|
if user_role in list(LitellmUserRoles.__annotations__.keys()):
|
||||||
|
assert new_obj.user_role == user_role
|
||||||
|
else:
|
||||||
|
assert new_obj.user_role == "internal_user"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue