add regenerate_key_fn

This commit is contained in:
Ishaan Jaff 2024-08-26 17:59:44 -07:00
parent 5745f3d6cc
commit 7230ee1f55

View file

@ -966,3 +966,82 @@ async def delete_verification_token(tokens: List, user_id: Optional[str] = None)
verbose_proxy_logger.debug(traceback.format_exc()) verbose_proxy_logger.debug(traceback.format_exc())
raise e raise e
return deleted_tokens return deleted_tokens
@router.post(
"/key/{key:path}/regenerate",
tags=["key management"],
dependencies=[Depends(user_api_key_auth)],
)
@management_endpoint_wrapper
async def regenerate_key_fn(
key: str,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
litellm_changed_by: Optional[str] = Header(
None,
description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
),
) -> GenerateKeyResponse:
from litellm.proxy.proxy_server import (
hash_token,
premium_user,
prisma_client,
user_api_key_cache,
)
"""
Endpoint for regenerating a key
"""
# Check if key exists, raise exception if key is not in the DB
### 1. Create New copy that is duplicate of existing key
######################################################################
# create duplicate of existing key
# set token = new token generated
# insert new token in DB
# create hash of token
if prisma_client is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"error": "DB not connected. prisma_client is None"},
)
hashed_api_key = hash_token(key)
_key_in_db = await prisma_client.db.litellm_verificationtoken.find_unique(
where={"token": hashed_api_key},
)
if _key_in_db is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"error": f"Key {key} not found."},
)
verbose_proxy_logger.debug("key_in_db: %s", _key_in_db)
new_token = f"sk-{secrets.token_urlsafe(16)}"
new_token_hash = hash_token(new_token)
# update new token in DB
updated_token = await prisma_client.db.litellm_verificationtoken.update(
where={"token": hashed_api_key}, data={"token": new_token_hash}
)
updated_token_dict = {}
if updated_token is not None:
updated_token_dict = dict(updated_token)
updated_token_dict["token"] = new_token
### 3. remove existing key entry from cache
######################################################################
if key:
user_api_key_cache.delete_cache(key)
if hashed_api_key:
user_api_key_cache.delete_cache(hashed_api_key)
return GenerateKeyResponse(
**updated_token_dict,
)