add check for admin only routes

This commit is contained in:
Ishaan Jaff 2024-09-03 15:03:32 -07:00
parent aae2ba208d
commit 0b63625673
6 changed files with 217 additions and 156 deletions

View file

@ -58,9 +58,8 @@ from litellm.proxy.auth.auth_checks import (
)
from litellm.proxy.auth.auth_utils import (
_get_request_ip_address,
check_if_request_size_is_safe,
_has_user_setup_sso,
get_request_route,
is_llm_api_route,
is_pass_through_provider_route,
pre_db_read_auth_checks,
route_in_additonal_public_routes,
@ -68,6 +67,7 @@ from litellm.proxy.auth.auth_utils import (
)
from litellm.proxy.auth.oauth2_check import check_oauth2_token
from litellm.proxy.auth.oauth2_proxy_hook import handle_oauth2_proxy_request
from litellm.proxy.auth.route_checks import non_admin_allowed_routes_check
from litellm.proxy.common_utils.http_parsing_utils import _read_request_body
from litellm.proxy.utils import _to_ns
@ -976,96 +976,15 @@ async def user_api_key_auth(
_user_role = _get_user_role(user_obj=user_obj)
if not _is_user_proxy_admin(user_obj=user_obj): # if non-admin
if is_llm_api_route(route=route):
pass
elif (
route in LiteLLMRoutes.info_routes.value
): # check if user allowed to call an info route
if route == "/key/info":
# check if user can access this route
query_params = request.query_params
key = query_params.get("key")
if key is not None and hash_token(token=key) != api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="user not allowed to access this key's info",
)
elif route == "/user/info":
# check if user can access this route
query_params = request.query_params
user_id = query_params.get("user_id")
verbose_proxy_logger.debug(
f"user_id: {user_id} & valid_token.user_id: {valid_token.user_id}"
)
if user_id != valid_token.user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="key not allowed to access this user's info. user_id={}, key's user_id={}".format(
user_id, valid_token.user_id
),
)
elif route == "/model/info":
# /model/info just shows models user has access to
pass
elif route == "/team/info":
pass # handled by function itself
elif (
_has_user_setup_sso()
and route in LiteLLMRoutes.sso_only_routes.value
):
pass
elif (
route in LiteLLMRoutes.global_spend_tracking_routes.value
and getattr(valid_token, "permissions", None) is not None
and "get_spend_routes" in getattr(valid_token, "permissions", [])
):
pass
elif _user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value:
if is_llm_api_route(route=route):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this OpenAI routes, role= {_user_role}",
)
if route in LiteLLMRoutes.management_routes.value:
# the Admin Viewer is only allowed to call /user/update for their own user_id and can only update
if route == "/user/update":
# Check the Request params are valid for PROXY_ADMIN_VIEW_ONLY
if request_data is not None and isinstance(
request_data, dict
):
_params_updated = request_data.keys()
for param in _params_updated:
if param not in ["user_email", "password"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route} and updating invalid param: {param}. only user_email and password can be updated",
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route}",
)
elif (
_user_role == LitellmUserRoles.INTERNAL_USER.value
and route in LiteLLMRoutes.internal_user_routes.value
):
pass
elif (
route in LiteLLMRoutes.self_managed_routes.value
): # routes that manage their own allowed/disallowed logic
pass
else:
user_role = "unknown"
user_id = "unknown"
if user_obj is not None:
user_role = user_obj.user_role or "unknown"
user_id = user_obj.user_id or "unknown"
raise Exception(
f"Only proxy admin can be used to generate, delete, update info for new keys/users/teams. Route={route}. Your role={user_role}. Your user_id={user_id}"
)
non_admin_allowed_routes_check(
user_obj=user_obj,
_user_role=_user_role,
route=route,
request=request,
request_data=request_data,
api_key=api_key,
valid_token=valid_token,
)
# check if token is from litellm-ui, litellm ui makes keys to allow users to login with sso. These keys can only be used for LiteLLM UI functions
# sso/login, ui/login, /key functions and /user functions
@ -1219,24 +1138,6 @@ def _return_user_api_key_auth_obj(
)
def _has_user_setup_sso():
"""
Check if the user has set up single sign-on (SSO) by verifying the presence of Microsoft client ID, Google client ID, and UI username environment variables.
Returns a boolean indicating whether SSO has been set up.
"""
microsoft_client_id = os.getenv("MICROSOFT_CLIENT_ID", None)
google_client_id = os.getenv("GOOGLE_CLIENT_ID", None)
ui_username = os.getenv("UI_USERNAME", None)
sso_setup = (
(microsoft_client_id is not None)
or (google_client_id is not None)
or (ui_username is not None)
)
return sso_setup
def _is_user_proxy_admin(user_obj: Optional[LiteLLM_UserTable]):
if user_obj is None:
return False