feat(key_management_endpoints.py): allow proxy_admin to enforce params on key creation

allows admin to force team keys to have tags
This commit is contained in:
Krrish Dholakia 2024-11-23 15:44:36 +05:30
parent 1277f553ee
commit 5338f8b3e3
5 changed files with 169 additions and 48 deletions

View file

@ -820,6 +820,7 @@ litellm_settings:
key_generation_settings:
team_key_generation:
allowed_team_member_roles: ["admin"]
required_params: ["tags"] # require team admins to set tags for cost-tracking when generating a team key
personal_key_generation: # maps to 'Default Team' on UI
allowed_user_roles: ["proxy_admin"]
```
@ -829,10 +830,12 @@ litellm_settings:
```python
class TeamUIKeyGenerationConfig(TypedDict):
allowed_team_member_roles: List[str]
required_params: List[str] # require params on `/key/generate` to be set if a team key (team_id in request) is being generated
class PersonalUIKeyGenerationConfig(TypedDict):
allowed_user_roles: List[LitellmUserRoles]
required_params: List[str] # require params on `/key/generate` to be set if a personal key (no team_id in request) is being generated
class StandardKeyGenerationConfig(TypedDict, total=False):

View file

@ -11,28 +11,13 @@ model_list:
model: vertex_ai/claude-3-5-sonnet-v2
vertex_ai_project: "adroit-crow-413218"
vertex_ai_location: "us-east5"
- model_name: fake-openai-endpoint
litellm_params:
model: openai/fake
api_key: fake-key
api_base: https://exampleopenaiendpoint-production.up.railway.app/
router_settings:
model_group_alias:
"gpt-4-turbo": # Aliased model name
model: "gpt-4" # Actual model name in 'model_list'
hidden: true
litellm_settings:
default_team_settings:
- team_id: team-1
success_callback: ["langfuse"]
failure_callback: ["langfuse"]
langfuse_public_key: os.environ/LANGFUSE_PROJECT1_PUBLIC # Project 1
langfuse_secret: os.environ/LANGFUSE_PROJECT1_SECRET # Project 1
- team_id: team-2
success_callback: ["langfuse"]
failure_callback: ["langfuse"]
langfuse_public_key: os.environ/LANGFUSE_PROJECT2_PUBLIC # Project 2
langfuse_secret: os.environ/LANGFUSE_PROJECT2_SECRET # Project 2
langfuse_host: https://us.cloud.langfuse.com
success_callback: ["langfuse"]
callbacks: ["prometheus"]
key_generation_settings:
team_key_generation:
allowed_team_member_roles: ["admin"]
required_params: ["tags"]
personal_key_generation: # maps to 'Default Team' on UI
allowed_user_roles: ["proxy_admin"]

View file

@ -39,16 +39,20 @@ from litellm.proxy.utils import (
handle_exception_on_proxy,
)
from litellm.secret_managers.main import get_secret
from litellm.types.utils import PersonalUIKeyGenerationConfig, TeamUIKeyGenerationConfig
def _is_team_key(data: GenerateKeyRequest):
return data.team_id is not None
def _team_key_generation_check(user_api_key_dict: UserAPIKeyAuth):
def _team_key_generation_team_member_check(
user_api_key_dict: UserAPIKeyAuth,
team_key_generation: Optional[TeamUIKeyGenerationConfig],
):
if (
litellm.key_generation_settings is None
or litellm.key_generation_settings.get("team_key_generation") is None
team_key_generation is None
or "allowed_team_member_roles" not in team_key_generation
):
return True
@ -59,12 +63,7 @@ def _team_key_generation_check(user_api_key_dict: UserAPIKeyAuth):
)
team_member_role = user_api_key_dict.team_member.role
if (
team_member_role
not in litellm.key_generation_settings["team_key_generation"][ # type: ignore
"allowed_team_member_roles"
]
):
if team_member_role not in team_key_generation["allowed_team_member_roles"]:
raise HTTPException(
status_code=400,
detail=f"Team member role {team_member_role} not in allowed_team_member_roles={litellm.key_generation_settings['team_key_generation']['allowed_team_member_roles']}", # type: ignore
@ -72,7 +71,67 @@ def _team_key_generation_check(user_api_key_dict: UserAPIKeyAuth):
return True
def _personal_key_generation_check(user_api_key_dict: UserAPIKeyAuth):
def _key_generation_required_param_check(
data: GenerateKeyRequest, required_params: Optional[List[str]]
):
if required_params is None:
return True
data_dict = data.model_dump(exclude_unset=True)
for param in required_params:
if param not in data_dict:
raise HTTPException(
status_code=400,
detail=f"Required param {param} not in data",
)
return True
def _team_key_generation_check(
user_api_key_dict: UserAPIKeyAuth, data: GenerateKeyRequest
):
if (
litellm.key_generation_settings is None
or litellm.key_generation_settings.get("team_key_generation") is None
):
return True
_team_key_generation = litellm.key_generation_settings["team_key_generation"] # type: ignore
_team_key_generation_team_member_check(
user_api_key_dict,
team_key_generation=_team_key_generation,
)
_key_generation_required_param_check(
data,
_team_key_generation.get("required_params"),
)
return True
def _personal_key_membership_check(
user_api_key_dict: UserAPIKeyAuth,
personal_key_generation: Optional[PersonalUIKeyGenerationConfig],
):
if (
personal_key_generation is None
or "allowed_user_roles" not in personal_key_generation
):
return True
if user_api_key_dict.user_role not in personal_key_generation["allowed_user_roles"]:
raise HTTPException(
status_code=400,
detail=f"Personal key creation has been restricted by admin. Allowed roles={litellm.key_generation_settings['personal_key_generation']['allowed_user_roles']}. Your role={user_api_key_dict.user_role}", # type: ignore
)
return True
def _personal_key_generation_check(
user_api_key_dict: UserAPIKeyAuth, data: GenerateKeyRequest
):
if (
litellm.key_generation_settings is None
@ -80,16 +139,18 @@ def _personal_key_generation_check(user_api_key_dict: UserAPIKeyAuth):
):
return True
if (
user_api_key_dict.user_role
not in litellm.key_generation_settings["personal_key_generation"][ # type: ignore
"allowed_user_roles"
]
):
raise HTTPException(
status_code=400,
detail=f"Personal key creation has been restricted by admin. Allowed roles={litellm.key_generation_settings['personal_key_generation']['allowed_user_roles']}. Your role={user_api_key_dict.user_role}", # type: ignore
)
_personal_key_generation = litellm.key_generation_settings["personal_key_generation"] # type: ignore
_personal_key_membership_check(
user_api_key_dict,
personal_key_generation=_personal_key_generation,
)
_key_generation_required_param_check(
data,
_personal_key_generation.get("required_params"),
)
return True
@ -106,9 +167,13 @@ def key_generation_check(
is_team_key = _is_team_key(data=data)
if is_team_key:
return _team_key_generation_check(user_api_key_dict)
return _team_key_generation_check(
user_api_key_dict=user_api_key_dict, data=data
)
else:
return _personal_key_generation_check(user_api_key_dict=user_api_key_dict)
return _personal_key_generation_check(
user_api_key_dict=user_api_key_dict, data=data
)
router = APIRouter()

View file

@ -1604,11 +1604,17 @@ class StandardCallbackDynamicParams(TypedDict, total=False):
langsmith_base_url: Optional[str]
class TeamUIKeyGenerationConfig(TypedDict):
class KeyGenerationConfig(TypedDict, total=False):
required_params: List[
str
] # specify params that must be present in the key generation request
class TeamUIKeyGenerationConfig(KeyGenerationConfig):
allowed_team_member_roles: List[str]
class PersonalUIKeyGenerationConfig(TypedDict):
class PersonalUIKeyGenerationConfig(KeyGenerationConfig):
allowed_user_roles: List[str]

View file

@ -551,7 +551,7 @@ def test_is_team_key():
assert not _is_team_key(GenerateKeyRequest(user_id="test_user_id"))
def test_team_key_generation_check():
def test_team_key_generation_team_member_check():
from litellm.proxy.management_endpoints.key_management_endpoints import (
_team_key_generation_check,
)
@ -580,6 +580,68 @@ def test_team_key_generation_check():
)
@pytest.mark.parametrize(
"team_key_generation_settings, input_data, expected_result",
[
({"required_params": ["tags"]}, GenerateKeyRequest(tags=["test_tags"]), True),
({}, GenerateKeyRequest(), True),
(
{"required_params": ["models"]},
GenerateKeyRequest(tags=["test_tags"]),
False,
),
],
)
@pytest.mark.parametrize("key_type", ["team_key", "personal_key"])
def test_key_generation_required_params_check(
team_key_generation_settings, input_data, expected_result, key_type
):
from litellm.proxy.management_endpoints.key_management_endpoints import (
_team_key_generation_check,
_personal_key_generation_check,
)
from litellm.types.utils import (
TeamUIKeyGenerationConfig,
StandardKeyGenerationConfig,
PersonalUIKeyGenerationConfig,
)
from fastapi import HTTPException
user_api_key_dict = UserAPIKeyAuth(
user_role=LitellmUserRoles.INTERNAL_USER,
api_key="sk-1234",
user_id="test_user_id",
team_id="test_team_id",
team_member=Member(role="admin", user_id="test_user_id"),
)
if key_type == "team_key":
litellm.key_generation_settings = StandardKeyGenerationConfig(
team_key_generation=TeamUIKeyGenerationConfig(
**team_key_generation_settings
)
)
elif key_type == "personal_key":
litellm.key_generation_settings = StandardKeyGenerationConfig(
personal_key_generation=PersonalUIKeyGenerationConfig(
**team_key_generation_settings
)
)
if expected_result:
if key_type == "team_key":
assert _team_key_generation_check(user_api_key_dict, input_data)
elif key_type == "personal_key":
assert _personal_key_generation_check(user_api_key_dict, input_data)
else:
if key_type == "team_key":
with pytest.raises(HTTPException):
_team_key_generation_check(user_api_key_dict, input_data)
elif key_type == "personal_key":
with pytest.raises(HTTPException):
_personal_key_generation_check(user_api_key_dict, input_data)
def test_personal_key_generation_check():
from litellm.proxy.management_endpoints.key_management_endpoints import (
_personal_key_generation_check,