[Team Member permissions] - Fixes (#9945)

* only load member permissions for non-admins

* run member permission checks on update + regenerate endpoints

* run check for /key/generate

* working test_default_member_permissions

* passing test with permissions on update delete endpoints

* test_create_permissions

* _team_key_generation_check

* fix TeamBase

* fix team endpoints

* fix api docs check
This commit is contained in:
Ishaan Jaff 2025-04-12 11:17:51 -07:00 committed by GitHub
parent d2a462fc93
commit 4e81b2cab4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 601 additions and 64 deletions

View file

@ -1,7 +1,7 @@
"""
KEY MANAGEMENT
All /key management endpoints
All /key management endpoints
/key/generate
/key/info
@ -42,6 +42,9 @@ from litellm.proxy.management_endpoints.common_utils import (
from litellm.proxy.management_endpoints.model_management_endpoints import (
_add_model_to_db,
)
from litellm.proxy.management_helpers.team_member_permission_checks import (
TeamMemberPermissionChecks,
)
from litellm.proxy.management_helpers.utils import management_endpoint_wrapper
from litellm.proxy.spend_tracking.spend_tracking_utils import _is_master_key
from litellm.proxy.utils import (
@ -104,20 +107,16 @@ def _is_allowed_to_make_key_request(
and user_api_key_dict.team_id == UI_TEAM_ID
):
return True # handle https://github.com/BerriAI/litellm/issues/7482
assert (
user_api_key_dict.team_id == team_id
), "User can only create keys for their own team. Got={}, Your Team ID={}".format(
team_id, user_api_key_dict.team_id
)
return True
def _team_key_generation_team_member_check(
def _team_key_operation_team_member_check(
assigned_user_id: Optional[str],
team_table: LiteLLM_TeamTableCachedObj,
user_api_key_dict: UserAPIKeyAuth,
team_key_generation: TeamUIKeyGenerationConfig,
route: KeyManagementRoutes,
):
if assigned_user_id is not None:
key_assigned_user_in_team = _get_user_in_team(
@ -130,7 +129,7 @@ def _team_key_generation_team_member_check(
detail=f"User={assigned_user_id} not assigned to team={team_table.team_id}",
)
key_creating_user_in_team = _get_user_in_team(
team_member_object = _get_user_in_team(
team_table=team_table, user_id=user_api_key_dict.user_id
)
@ -141,20 +140,26 @@ def _team_key_generation_team_member_check(
if is_admin:
return True
elif key_creating_user_in_team is None:
elif team_member_object is None:
raise HTTPException(
status_code=400,
detail=f"User={user_api_key_dict.user_id} not assigned to team={team_table.team_id}",
)
elif (
"allowed_team_member_roles" in team_key_generation
and key_creating_user_in_team.role
and team_member_object.role
not in team_key_generation["allowed_team_member_roles"]
):
raise HTTPException(
status_code=400,
detail=f"Team member role {key_creating_user_in_team.role} not in allowed_team_member_roles={team_key_generation['allowed_team_member_roles']}",
detail=f"Team member role {team_member_object.role} not in allowed_team_member_roles={team_key_generation['allowed_team_member_roles']}",
)
TeamMemberPermissionChecks.does_team_member_have_permissions_for_endpoint(
team_member_object=team_member_object,
team_table=team_table,
route=route,
)
return True
@ -178,6 +183,7 @@ def _team_key_generation_check(
team_table: LiteLLM_TeamTableCachedObj,
user_api_key_dict: UserAPIKeyAuth,
data: GenerateKeyRequest,
route: KeyManagementRoutes,
):
if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
return True
@ -191,11 +197,12 @@ def _team_key_generation_check(
allowed_team_member_roles=["admin", "user"],
)
_team_key_generation_team_member_check(
_team_key_operation_team_member_check(
assigned_user_id=data.user_id,
team_table=team_table,
user_api_key_dict=user_api_key_dict,
team_key_generation=_team_key_generation,
route=route,
)
_key_generation_required_param_check(
data,
@ -252,6 +259,7 @@ def key_generation_check(
team_table: Optional[LiteLLM_TeamTableCachedObj],
user_api_key_dict: UserAPIKeyAuth,
data: GenerateKeyRequest,
route: KeyManagementRoutes,
) -> bool:
"""
Check if admin has restricted key creation to certain roles for teams or individuals
@ -271,6 +279,7 @@ def key_generation_check(
team_table=team_table,
user_api_key_dict=user_api_key_dict,
data=data,
route=route,
)
else:
return _personal_key_generation_check(
@ -425,6 +434,7 @@ async def generate_key_fn( # noqa: PLR0915
team_table=team_table,
user_api_key_dict=user_api_key_dict,
data=data,
route=KeyManagementRoutes.KEY_GENERATE,
)
common_key_access_checks(
@ -567,9 +577,9 @@ async def generate_key_fn( # noqa: PLR0915
request_type="key", **data_json, table_name="key"
)
response[
"soft_budget"
] = data.soft_budget # include the user-input soft budget in the response
response["soft_budget"] = (
data.soft_budget
) # include the user-input soft budget in the response
response = GenerateKeyResponse(**response)
@ -762,6 +772,15 @@ async def update_key_fn(
detail={"error": f"Team not found, passed team_id={data.team_id}"},
)
# check if user has permission to update key
await TeamMemberPermissionChecks.can_team_member_execute_key_management_endpoint(
user_api_key_dict=user_api_key_dict,
route=KeyManagementRoutes.KEY_UPDATE,
prisma_client=prisma_client,
existing_key_row=existing_key_row,
user_api_key_cache=user_api_key_cache,
)
non_default_values = prepare_key_update_data(
data=data, existing_key_row=existing_key_row
)
@ -1049,7 +1068,7 @@ async def info_key_fn(
)
if (
_can_user_query_key_info(
await _can_user_query_key_info(
user_api_key_dict=user_api_key_dict,
key=key,
key_info=key_info,
@ -1376,11 +1395,12 @@ async def _team_key_deletion_check(
)
# check if user is team admin
if team_table is not None:
return _team_key_generation_team_member_check(
return _team_key_operation_team_member_check(
assigned_user_id=user_api_key_dict.user_id,
team_table=team_table,
user_api_key_dict=user_api_key_dict,
team_key_generation=_team_key_generation,
route=KeyManagementRoutes.KEY_DELETE,
)
else:
raise HTTPException(
@ -1447,10 +1467,10 @@ async def delete_verification_tokens(
try:
if prisma_client:
tokens = [_hash_token_if_needed(token=key) for key in tokens]
_keys_being_deleted: List[
LiteLLM_VerificationToken
] = await prisma_client.db.litellm_verificationtoken.find_many(
where={"token": {"in": tokens}}
_keys_being_deleted: List[LiteLLM_VerificationToken] = (
await prisma_client.db.litellm_verificationtoken.find_many(
where={"token": {"in": tokens}}
)
)
# Assuming 'db' is your Prisma Client instance
@ -1552,9 +1572,9 @@ async def _rotate_master_key(
from litellm.proxy.proxy_server import proxy_config
try:
models: Optional[
List
] = await prisma_client.db.litellm_proxymodeltable.find_many()
models: Optional[List] = (
await prisma_client.db.litellm_proxymodeltable.find_many()
)
except Exception:
models = None
# 2. process model table
@ -1743,6 +1763,15 @@ async def regenerate_key_fn(
detail={"error": f"Key {key} not found."},
)
# check if user has permission to regenerate key
await TeamMemberPermissionChecks.can_team_member_execute_key_management_endpoint(
user_api_key_dict=user_api_key_dict,
route=KeyManagementRoutes.KEY_REGENERATE,
prisma_client=prisma_client,
existing_key_row=_key_in_db,
user_api_key_cache=user_api_key_cache,
)
verbose_proxy_logger.debug("key_in_db: %s", _key_in_db)
new_token = f"sk-{secrets.token_urlsafe(LENGTH_OF_LITELLM_GENERATED_KEY)}"
@ -1832,11 +1861,11 @@ async def validate_key_list_check(
param="user_id",
code=status.HTTP_403_FORBIDDEN,
)
complete_user_info_db_obj: Optional[
BaseModel
] = await prisma_client.db.litellm_usertable.find_unique(
where={"user_id": user_api_key_dict.user_id},
include={"organization_memberships": True},
complete_user_info_db_obj: Optional[BaseModel] = (
await prisma_client.db.litellm_usertable.find_unique(
where={"user_id": user_api_key_dict.user_id},
include={"organization_memberships": True},
)
)
if complete_user_info_db_obj is None:
@ -1897,10 +1926,10 @@ async def get_admin_team_ids(
if complete_user_info is None:
return []
# Get all teams that user is an admin of
teams: Optional[
List[BaseModel]
] = await prisma_client.db.litellm_teamtable.find_many(
where={"team_id": {"in": complete_user_info.teams}}
teams: Optional[List[BaseModel]] = (
await prisma_client.db.litellm_teamtable.find_many(
where={"team_id": {"in": complete_user_info.teams}}
)
)
if teams is None:
return []
@ -2450,7 +2479,7 @@ async def key_health(
)
def _can_user_query_key_info(
async def _can_user_query_key_info(
user_api_key_dict: UserAPIKeyAuth,
key: Optional[str],
key_info: LiteLLM_VerificationToken,
@ -2468,6 +2497,11 @@ def _can_user_query_key_info(
# user can query their own key info
elif key_info.user_id == user_api_key_dict.user_id:
return True
elif await TeamMemberPermissionChecks.user_belongs_to_keys_team(
user_api_key_dict=user_api_key_dict,
existing_key_row=key_info,
):
return True
return False