use static methods for Routechecks

This commit is contained in:
Ishaan Jaff 2024-10-25 10:26:43 +04:00
parent c4cab8812a
commit 2e0f501b56
2 changed files with 163 additions and 160 deletions

View file

@ -17,175 +17,178 @@ from .auth_checks_organization import _user_is_org_admin
from .auth_utils import _has_user_setup_sso from .auth_utils import _has_user_setup_sso
def non_proxy_admin_allowed_routes_check( class RouteChecks:
user_obj: Optional[LiteLLM_UserTable],
_user_role: Optional[LitellmUserRoles],
route: str,
request: Request,
valid_token: UserAPIKeyAuth,
api_key: str,
request_data: dict,
):
"""
Checks if Non Proxy Admin User is allowed to access the route
"""
# Check user has defined custom admin routes @staticmethod
custom_admin_only_route_check( def non_proxy_admin_allowed_routes_check(
route=route, user_obj: Optional[LiteLLM_UserTable],
) _user_role: Optional[LitellmUserRoles],
route: str,
if is_llm_api_route(route=route): request: Request,
pass valid_token: UserAPIKeyAuth,
elif ( api_key: str,
route in LiteLLMRoutes.info_routes.value request_data: dict,
): # check if user allowed to call an info route
if route == "/key/info":
# check if user can access this route
query_params = request.query_params
key = query_params.get("key")
if key is not None and hash_token(token=key) != api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="user not allowed to access this key's info",
)
elif route == "/user/info":
# check if user can access this route
query_params = request.query_params
user_id = query_params.get("user_id")
verbose_proxy_logger.debug(
f"user_id: {user_id} & valid_token.user_id: {valid_token.user_id}"
)
if user_id and user_id != valid_token.user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="key not allowed to access this user's info. user_id={}, key's user_id={}".format(
user_id, valid_token.user_id
),
)
elif route == "/model/info":
# /model/info just shows models user has access to
pass
elif route == "/team/info":
pass # handled by function itself
elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value:
pass
elif (
route in LiteLLMRoutes.global_spend_tracking_routes.value
and getattr(valid_token, "permissions", None) is not None
and "get_spend_routes" in getattr(valid_token, "permissions", [])
): ):
"""
Checks if Non Proxy Admin User is allowed to access the route
"""
pass # Check user has defined custom admin routes
elif _user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value: RouteChecks.custom_admin_only_route_check(
if is_llm_api_route(route=route): route=route,
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this OpenAI routes, role= {_user_role}",
)
if route in LiteLLMRoutes.management_routes.value:
# the Admin Viewer is only allowed to call /user/update for their own user_id and can only update
if route == "/user/update":
# Check the Request params are valid for PROXY_ADMIN_VIEW_ONLY
if request_data is not None and isinstance(request_data, dict):
_params_updated = request_data.keys()
for param in _params_updated:
if param not in ["user_email", "password"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route} and updating invalid param: {param}. only user_email and password can be updated",
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route}",
)
elif (
_user_role == LitellmUserRoles.INTERNAL_USER.value
and route in LiteLLMRoutes.internal_user_routes.value
):
pass
elif (
_user_is_org_admin(request_data=request_data, user_object=user_obj)
and route in LiteLLMRoutes.org_admin_allowed_routes.value
):
pass
elif (
_user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value
and route in LiteLLMRoutes.internal_user_view_only_routes.value
):
pass
elif (
route in LiteLLMRoutes.self_managed_routes.value
): # routes that manage their own allowed/disallowed logic
pass
else:
user_role = "unknown"
user_id = "unknown"
if user_obj is not None:
user_role = user_obj.user_role or "unknown"
user_id = user_obj.user_id or "unknown"
raise Exception(
f"Only proxy admin can be used to generate, delete, update info for new keys/users/teams. Route={route}. Your role={user_role}. Your user_id={user_id}"
) )
if RouteChecks.is_llm_api_route(route=route):
pass
elif (
route in LiteLLMRoutes.info_routes.value
): # check if user allowed to call an info route
if route == "/key/info":
# check if user can access this route
query_params = request.query_params
key = query_params.get("key")
if key is not None and hash_token(token=key) != api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="user not allowed to access this key's info",
)
elif route == "/user/info":
# check if user can access this route
query_params = request.query_params
user_id = query_params.get("user_id")
verbose_proxy_logger.debug(
f"user_id: {user_id} & valid_token.user_id: {valid_token.user_id}"
)
if user_id and user_id != valid_token.user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="key not allowed to access this user's info. user_id={}, key's user_id={}".format(
user_id, valid_token.user_id
),
)
elif route == "/model/info":
# /model/info just shows models user has access to
pass
elif route == "/team/info":
pass # handled by function itself
elif _has_user_setup_sso() and route in LiteLLMRoutes.sso_only_routes.value:
pass
elif (
route in LiteLLMRoutes.global_spend_tracking_routes.value
and getattr(valid_token, "permissions", None) is not None
and "get_spend_routes" in getattr(valid_token, "permissions", [])
):
def custom_admin_only_route_check(route: str): pass
from litellm.proxy.proxy_server import general_settings, premium_user elif _user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value:
if RouteChecks.is_llm_api_route(route=route):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this OpenAI routes, role= {_user_role}",
)
if route in LiteLLMRoutes.management_routes.value:
# the Admin Viewer is only allowed to call /user/update for their own user_id and can only update
if route == "/user/update":
if "admin_only_routes" in general_settings: # Check the Request params are valid for PROXY_ADMIN_VIEW_ONLY
if premium_user is not True: if request_data is not None and isinstance(request_data, dict):
verbose_proxy_logger.error( _params_updated = request_data.keys()
f"Trying to use 'admin_only_routes' this is an Enterprise only feature. {CommonProxyErrors.not_premium_user.value}" for param in _params_updated:
if param not in ["user_email", "password"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route} and updating invalid param: {param}. only user_email and password can be updated",
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route, role= {_user_role}. Trying to access: {route}",
)
elif (
_user_role == LitellmUserRoles.INTERNAL_USER.value
and route in LiteLLMRoutes.internal_user_routes.value
):
pass
elif (
_user_is_org_admin(request_data=request_data, user_object=user_obj)
and route in LiteLLMRoutes.org_admin_allowed_routes.value
):
pass
elif (
_user_role == LitellmUserRoles.INTERNAL_USER_VIEW_ONLY.value
and route in LiteLLMRoutes.internal_user_view_only_routes.value
):
pass
elif (
route in LiteLLMRoutes.self_managed_routes.value
): # routes that manage their own allowed/disallowed logic
pass
else:
user_role = "unknown"
user_id = "unknown"
if user_obj is not None:
user_role = user_obj.user_role or "unknown"
user_id = user_obj.user_id or "unknown"
raise Exception(
f"Only proxy admin can be used to generate, delete, update info for new keys/users/teams. Route={route}. Your role={user_role}. Your user_id={user_id}"
) )
return
if route in general_settings["admin_only_routes"]: @staticmethod
raise HTTPException( def custom_admin_only_route_check(route: str):
status_code=status.HTTP_403_FORBIDDEN, from litellm.proxy.proxy_server import general_settings, premium_user
detail=f"user not allowed to access this route. Route={route} is an admin only route",
) if "admin_only_routes" in general_settings:
pass if premium_user is not True:
verbose_proxy_logger.error(
f"Trying to use 'admin_only_routes' this is an Enterprise only feature. {CommonProxyErrors.not_premium_user.value}"
)
return
if route in general_settings["admin_only_routes"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"user not allowed to access this route. Route={route} is an admin only route",
)
pass
@staticmethod
def is_llm_api_route(route: str) -> bool:
"""
Helper to checks if provided route is an OpenAI route
def is_llm_api_route(route: str) -> bool: Returns:
""" - True: if route is an OpenAI route
Helper to checks if provided route is an OpenAI route - False: if route is not an OpenAI route
"""
if route in LiteLLMRoutes.openai_routes.value:
return True
Returns: if route in LiteLLMRoutes.anthropic_routes.value:
- True: if route is an OpenAI route return True
- False: if route is not an OpenAI route
"""
if route in LiteLLMRoutes.openai_routes.value: # fuzzy match routes like "/v1/threads/thread_49EIN5QF32s4mH20M7GFKdlZ"
return True # Check for routes with placeholders
for openai_route in LiteLLMRoutes.openai_routes.value:
# Replace placeholders with regex pattern
# placeholders are written as "/threads/{thread_id}"
if "{" in openai_route:
pattern = re.sub(r"\{[^}]+\}", r"[^/]+", openai_route)
# Anchor the pattern to match the entire string
pattern = f"^{pattern}$"
if re.match(pattern, route):
return True
if route in LiteLLMRoutes.anthropic_routes.value: # Pass through Bedrock, VertexAI, and Cohere Routes
return True if "/bedrock/" in route:
return True
# fuzzy match routes like "/v1/threads/thread_49EIN5QF32s4mH20M7GFKdlZ" if "/vertex-ai/" in route:
# Check for routes with placeholders return True
for openai_route in LiteLLMRoutes.openai_routes.value: if "/gemini/" in route:
# Replace placeholders with regex pattern return True
# placeholders are written as "/threads/{thread_id}" if "/cohere/" in route:
if "{" in openai_route: return True
pattern = re.sub(r"\{[^}]+\}", r"[^/]+", openai_route) if "/langfuse/" in route:
# Anchor the pattern to match the entire string return True
pattern = f"^{pattern}$" return False
if re.match(pattern, route):
return True
# Pass through Bedrock, VertexAI, and Cohere Routes
if "/bedrock/" in route:
return True
if "/vertex-ai/" in route:
return True
if "/gemini/" in route:
return True
if "/cohere/" in route:
return True
if "/langfuse/" in route:
return True
return False

View file

@ -69,7 +69,7 @@ from litellm.proxy.auth.auth_utils import (
) )
from litellm.proxy.auth.oauth2_check import check_oauth2_token from litellm.proxy.auth.oauth2_check import check_oauth2_token
from litellm.proxy.auth.oauth2_proxy_hook import handle_oauth2_proxy_request from litellm.proxy.auth.oauth2_proxy_hook import handle_oauth2_proxy_request
from litellm.proxy.auth.route_checks import non_proxy_admin_allowed_routes_check from litellm.proxy.auth.route_checks import RouteChecks
from litellm.proxy.auth.service_account_checks import service_account_checks from litellm.proxy.auth.service_account_checks import service_account_checks
from litellm.proxy.common_utils.http_parsing_utils import _read_request_body from litellm.proxy.common_utils.http_parsing_utils import _read_request_body
from litellm.proxy.utils import _to_ns from litellm.proxy.utils import _to_ns
@ -150,7 +150,7 @@ def _is_api_route_allowed(
raise Exception("Invalid proxy server token passed") raise Exception("Invalid proxy server token passed")
if not _is_user_proxy_admin(user_obj=user_obj): # if non-admin if not _is_user_proxy_admin(user_obj=user_obj): # if non-admin
non_proxy_admin_allowed_routes_check( RouteChecks.non_proxy_admin_allowed_routes_check(
user_obj=user_obj, user_obj=user_obj,
_user_role=_user_role, _user_role=_user_role,
route=route, route=route,