Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
Ishaan Jaff
818a118bda test_enforce_unique_key_alias 2024-11-27 18:02:33 -08:00
Ishaan Jaff
9c3acfbb5d fix _enforce_unique_key_alias 2024-11-27 17:46:47 -08:00
Ishaan Jaff
de6ecdbc82 fix _enforce_unique_key_alias 2024-11-27 17:17:59 -08:00
Ishaan Jaff
3e50a06fb4 fix _enforce_unique_key_alias 2024-11-27 16:22:04 -08:00
Ishaan Jaff
74c025c50c add enforcement for unique key aliases 2024-11-27 11:47:21 -08:00
2 changed files with 195 additions and 0 deletions

View file

@ -420,6 +420,11 @@ async def generate_key_fn( # noqa: PLR0915
data_json.pop("tags")
await _enforce_unique_key_alias(
key_alias=data_json.get("key_alias", None),
prisma_client=prisma_client,
)
response = await generate_key_helper_fn(
request_type="key", **data_json, table_name="key"
)
@ -585,6 +590,12 @@ async def update_key_fn(
data=data, existing_key_row=existing_key_row
)
await _enforce_unique_key_alias(
key_alias=non_default_values.get("key_alias", None),
prisma_client=prisma_client,
existing_key_token=existing_key_row.token,
)
response = await prisma_client.update_data(
token=key, data={**non_default_values, "token": key}
)
@ -1883,3 +1894,38 @@ async def test_key_logging(
status="healthy",
details=f"No logger exceptions triggered, system is healthy. Manually check if logs were sent to {logging_callbacks} ",
)
async def _enforce_unique_key_alias(
key_alias: Optional[str],
prisma_client: Any,
existing_key_token: Optional[str] = None,
) -> None:
"""
Helper to enforce unique key aliases across all keys.
Args:
key_alias (Optional[str]): The key alias to check
prisma_client (Any): Prisma client instance
existing_key_token (Optional[str]): ID of existing key being updated, to exclude from uniqueness check
(The Admin UI passes key_alias, in all Edit key requests. So we need to be sure that if we find a key with the same alias, it's not the same key we're updating)
Raises:
HTTPException: If key alias already exists on a different key
"""
if key_alias is not None and prisma_client is not None:
where_clause: dict[str, Any] = {"key_alias": key_alias}
if existing_key_token:
# Exclude the current key from the uniqueness check
where_clause["NOT"] = {"token": existing_key_token}
existing_key = await prisma_client.db.litellm_verificationtoken.find_first(
where=where_clause
)
if existing_key is not None:
raise ProxyException(
message=f"Key with alias '{key_alias}' already exists. Unique key aliases across all keys are required.",
type=ProxyErrorTypes.bad_request_error,
param="key_alias",
code=status.HTTP_400_BAD_REQUEST,
)

View file

@ -3550,3 +3550,152 @@ async def test_key_generate_with_secret_manager_call(prisma_client):
################################################################################
@pytest.mark.asyncio
async def test_key_alias_uniqueness(prisma_client):
"""
Test that:
1. We cannot create two keys with the same alias
2. We cannot update a key to use an alias that's already taken
3. We can update a key while keeping its existing alias
"""
setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client)
setattr(litellm.proxy.proxy_server, "master_key", "sk-1234")
await litellm.proxy.proxy_server.prisma_client.connect()
try:
# Create first key with an alias
unique_alias = f"test-alias-{uuid.uuid4()}"
key1 = await generate_key_fn(
data=GenerateKeyRequest(key_alias=unique_alias),
user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
),
)
# Try to create second key with same alias - should fail
try:
key2 = await generate_key_fn(
data=GenerateKeyRequest(key_alias=unique_alias),
user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
),
)
pytest.fail("Should not be able to create a second key with the same alias")
except Exception as e:
print("vars(e)=", vars(e))
assert "Unique key aliases across all keys are required" in str(e.message)
# Create another key with different alias
another_alias = f"test-alias-{uuid.uuid4()}"
key3 = await generate_key_fn(
data=GenerateKeyRequest(key_alias=another_alias),
user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
),
)
# Try to update key3 to use key1's alias - should fail
try:
await update_key_fn(
data=UpdateKeyRequest(key=key3.key, key_alias=unique_alias),
request=Request(scope={"type": "http"}),
)
pytest.fail("Should not be able to update a key to use an existing alias")
except Exception as e:
assert "Unique key aliases across all keys are required" in str(e.message)
# Update key1 with its own existing alias - should succeed
updated_key = await update_key_fn(
data=UpdateKeyRequest(key=key1.key, key_alias=unique_alias),
request=Request(scope={"type": "http"}),
)
assert updated_key is not None
except Exception as e:
print("got exceptions, e=", e)
print("vars(e)=", vars(e))
pytest.fail(f"An unexpected error occurred: {str(e)}")
@pytest.mark.asyncio
async def test_enforce_unique_key_alias(prisma_client):
"""
Unit test the _enforce_unique_key_alias function:
1. Test it allows unique aliases
2. Test it blocks duplicate aliases for new keys
3. Test it allows updating a key with its own existing alias
4. Test it blocks updating a key with another key's alias
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_enforce_unique_key_alias,
)
setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client)
await litellm.proxy.proxy_server.prisma_client.connect()
try:
# Test 1: Allow unique alias
unique_alias = f"test-alias-{uuid.uuid4()}"
await _enforce_unique_key_alias(
key_alias=unique_alias,
prisma_client=prisma_client,
) # Should pass
# Create a key with this alias in the database
key1 = await generate_key_fn(
data=GenerateKeyRequest(key_alias=unique_alias),
user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
),
)
# Test 2: Block duplicate alias for new key
try:
await _enforce_unique_key_alias(
key_alias=unique_alias,
prisma_client=prisma_client,
)
pytest.fail("Should not allow duplicate alias")
except Exception as e:
assert "Unique key aliases across all keys are required" in str(e.message)
# Test 3: Allow updating key with its own alias
await _enforce_unique_key_alias(
key_alias=unique_alias,
existing_key_token=hash_token(key1.key),
prisma_client=prisma_client,
) # Should pass
# Test 4: Block updating with another key's alias
another_key = await generate_key_fn(
data=GenerateKeyRequest(key_alias=f"test-alias-{uuid.uuid4()}"),
user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234",
user_id="1234",
),
)
try:
await _enforce_unique_key_alias(
key_alias=unique_alias,
existing_key_token=another_key.key,
prisma_client=prisma_client,
)
pytest.fail("Should not allow using another key's alias")
except Exception as e:
assert "Unique key aliases across all keys are required" in str(e.message)
except Exception as e:
print("Unexpected error:", e)
pytest.fail(f"An unexpected error occurred: {str(e)}")