forked from phoenix/litellm-mirror
(feat) add enforcement for unique key aliases on /key/update and /key/generate (#6944)
* add enforcement for unique key aliases * fix _enforce_unique_key_alias * fix _enforce_unique_key_alias * fix _enforce_unique_key_alias * test_enforce_unique_key_alias
This commit is contained in:
parent
4ebb7c8a7f
commit
d6181b2c9f
2 changed files with 195 additions and 0 deletions
|
@ -420,6 +420,11 @@ async def generate_key_fn( # noqa: PLR0915
|
|||
|
||||
data_json.pop("tags")
|
||||
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=data_json.get("key_alias", None),
|
||||
prisma_client=prisma_client,
|
||||
)
|
||||
|
||||
response = await generate_key_helper_fn(
|
||||
request_type="key", **data_json, table_name="key"
|
||||
)
|
||||
|
@ -586,6 +591,12 @@ async def update_key_fn(
|
|||
data=data, existing_key_row=existing_key_row
|
||||
)
|
||||
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=non_default_values.get("key_alias", None),
|
||||
prisma_client=prisma_client,
|
||||
existing_key_token=existing_key_row.token,
|
||||
)
|
||||
|
||||
response = await prisma_client.update_data(
|
||||
token=key, data={**non_default_values, "token": key}
|
||||
)
|
||||
|
@ -1884,3 +1895,38 @@ async def test_key_logging(
|
|||
status="healthy",
|
||||
details=f"No logger exceptions triggered, system is healthy. Manually check if logs were sent to {logging_callbacks} ",
|
||||
)
|
||||
|
||||
|
||||
async def _enforce_unique_key_alias(
|
||||
key_alias: Optional[str],
|
||||
prisma_client: Any,
|
||||
existing_key_token: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Helper to enforce unique key aliases across all keys.
|
||||
|
||||
Args:
|
||||
key_alias (Optional[str]): The key alias to check
|
||||
prisma_client (Any): Prisma client instance
|
||||
existing_key_token (Optional[str]): ID of existing key being updated, to exclude from uniqueness check
|
||||
(The Admin UI passes key_alias, in all Edit key requests. So we need to be sure that if we find a key with the same alias, it's not the same key we're updating)
|
||||
|
||||
Raises:
|
||||
HTTPException: If key alias already exists on a different key
|
||||
"""
|
||||
if key_alias is not None and prisma_client is not None:
|
||||
where_clause: dict[str, Any] = {"key_alias": key_alias}
|
||||
if existing_key_token:
|
||||
# Exclude the current key from the uniqueness check
|
||||
where_clause["NOT"] = {"token": existing_key_token}
|
||||
|
||||
existing_key = await prisma_client.db.litellm_verificationtoken.find_first(
|
||||
where=where_clause
|
||||
)
|
||||
if existing_key is not None:
|
||||
raise ProxyException(
|
||||
message=f"Key with alias '{key_alias}' already exists. Unique key aliases across all keys are required.",
|
||||
type=ProxyErrorTypes.bad_request_error,
|
||||
param="key_alias",
|
||||
code=status.HTTP_400_BAD_REQUEST,
|
||||
)
|
||||
|
|
|
@ -3632,3 +3632,152 @@ async def test_key_generate_with_secret_manager_call(prisma_client):
|
|||
|
||||
|
||||
################################################################################
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_key_alias_uniqueness(prisma_client):
|
||||
"""
|
||||
Test that:
|
||||
1. We cannot create two keys with the same alias
|
||||
2. We cannot update a key to use an alias that's already taken
|
||||
3. We can update a key while keeping its existing alias
|
||||
"""
|
||||
setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client)
|
||||
setattr(litellm.proxy.proxy_server, "master_key", "sk-1234")
|
||||
await litellm.proxy.proxy_server.prisma_client.connect()
|
||||
|
||||
try:
|
||||
# Create first key with an alias
|
||||
unique_alias = f"test-alias-{uuid.uuid4()}"
|
||||
key1 = await generate_key_fn(
|
||||
data=GenerateKeyRequest(key_alias=unique_alias),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
api_key="sk-1234",
|
||||
user_id="1234",
|
||||
),
|
||||
)
|
||||
|
||||
# Try to create second key with same alias - should fail
|
||||
try:
|
||||
key2 = await generate_key_fn(
|
||||
data=GenerateKeyRequest(key_alias=unique_alias),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
api_key="sk-1234",
|
||||
user_id="1234",
|
||||
),
|
||||
)
|
||||
pytest.fail("Should not be able to create a second key with the same alias")
|
||||
except Exception as e:
|
||||
print("vars(e)=", vars(e))
|
||||
assert "Unique key aliases across all keys are required" in str(e.message)
|
||||
|
||||
# Create another key with different alias
|
||||
another_alias = f"test-alias-{uuid.uuid4()}"
|
||||
key3 = await generate_key_fn(
|
||||
data=GenerateKeyRequest(key_alias=another_alias),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
api_key="sk-1234",
|
||||
user_id="1234",
|
||||
),
|
||||
)
|
||||
|
||||
# Try to update key3 to use key1's alias - should fail
|
||||
try:
|
||||
await update_key_fn(
|
||||
data=UpdateKeyRequest(key=key3.key, key_alias=unique_alias),
|
||||
request=Request(scope={"type": "http"}),
|
||||
)
|
||||
pytest.fail("Should not be able to update a key to use an existing alias")
|
||||
except Exception as e:
|
||||
assert "Unique key aliases across all keys are required" in str(e.message)
|
||||
|
||||
# Update key1 with its own existing alias - should succeed
|
||||
updated_key = await update_key_fn(
|
||||
data=UpdateKeyRequest(key=key1.key, key_alias=unique_alias),
|
||||
request=Request(scope={"type": "http"}),
|
||||
)
|
||||
assert updated_key is not None
|
||||
|
||||
except Exception as e:
|
||||
print("got exceptions, e=", e)
|
||||
print("vars(e)=", vars(e))
|
||||
pytest.fail(f"An unexpected error occurred: {str(e)}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enforce_unique_key_alias(prisma_client):
|
||||
"""
|
||||
Unit test the _enforce_unique_key_alias function:
|
||||
1. Test it allows unique aliases
|
||||
2. Test it blocks duplicate aliases for new keys
|
||||
3. Test it allows updating a key with its own existing alias
|
||||
4. Test it blocks updating a key with another key's alias
|
||||
"""
|
||||
from litellm.proxy.management_endpoints.key_management_endpoints import (
|
||||
_enforce_unique_key_alias,
|
||||
)
|
||||
|
||||
setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client)
|
||||
await litellm.proxy.proxy_server.prisma_client.connect()
|
||||
|
||||
try:
|
||||
# Test 1: Allow unique alias
|
||||
unique_alias = f"test-alias-{uuid.uuid4()}"
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=unique_alias,
|
||||
prisma_client=prisma_client,
|
||||
) # Should pass
|
||||
|
||||
# Create a key with this alias in the database
|
||||
key1 = await generate_key_fn(
|
||||
data=GenerateKeyRequest(key_alias=unique_alias),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
api_key="sk-1234",
|
||||
user_id="1234",
|
||||
),
|
||||
)
|
||||
|
||||
# Test 2: Block duplicate alias for new key
|
||||
try:
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=unique_alias,
|
||||
prisma_client=prisma_client,
|
||||
)
|
||||
pytest.fail("Should not allow duplicate alias")
|
||||
except Exception as e:
|
||||
assert "Unique key aliases across all keys are required" in str(e.message)
|
||||
|
||||
# Test 3: Allow updating key with its own alias
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=unique_alias,
|
||||
existing_key_token=hash_token(key1.key),
|
||||
prisma_client=prisma_client,
|
||||
) # Should pass
|
||||
|
||||
# Test 4: Block updating with another key's alias
|
||||
another_key = await generate_key_fn(
|
||||
data=GenerateKeyRequest(key_alias=f"test-alias-{uuid.uuid4()}"),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
api_key="sk-1234",
|
||||
user_id="1234",
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
await _enforce_unique_key_alias(
|
||||
key_alias=unique_alias,
|
||||
existing_key_token=another_key.key,
|
||||
prisma_client=prisma_client,
|
||||
)
|
||||
pytest.fail("Should not allow using another key's alias")
|
||||
except Exception as e:
|
||||
assert "Unique key aliases across all keys are required" in str(e.message)
|
||||
|
||||
except Exception as e:
|
||||
print("Unexpected error:", e)
|
||||
pytest.fail(f"An unexpected error occurred: {str(e)}")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue