litellm-mirror/tests/litellm_utils_tests/test_secret_manager.py
Ishaan Jaff b6f2e659b9
(Feat) Add x-litellm-overhead-duration-ms and "x-litellm-response-duration-ms" in response from LiteLLM (#7899)
* add track_llm_api_timing

* add track_llm_api_timing

* test_litellm_overhead

* use ResponseMetadata class for setting hidden params and response overhead

* instrument http handler

* fix track_llm_api_timing

* track_llm_api_timing

* emit response overhead on hidden params

* fix resp metadata

* fix make_sync_openai_embedding_request

* test_aaaaatext_completion_endpoint fixes

* _get_value_from_hidden_params

* set_hidden_params

* test_litellm_overhead

* test_litellm_overhead

* test_litellm_overhead

* fix import

* test_litellm_overhead_stream

* add LiteLLMLoggingObject

* use diff folder for testing

* use diff folder for overhead testing

* test litellm overhead

* use typing

* clear typing

* test_litellm_overhead

* fix async_streaming

* update_response_metadata

* move test file

* pply metadata to the response objec
2025-01-21 20:27:55 -08:00

356 lines
12 KiB
Python

import os
import sys
import time
import traceback
import uuid
from dotenv import load_dotenv
import json
load_dotenv()
import os
import tempfile
from uuid import uuid4
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import pytest
import litellm
from litellm.llms.azure.azure import get_azure_ad_token_from_oidc
from litellm.llms.bedrock.chat import BedrockConverseLLM, BedrockLLM
from litellm.secret_managers.aws_secret_manager_v2 import AWSSecretsManagerV2
from litellm.secret_managers.main import (
get_secret,
_should_read_secret_from_secret_manager,
)
def load_vertex_ai_credentials():
# Define the path to the vertex_key.json file
print("loading vertex ai credentials")
filepath = os.path.dirname(os.path.abspath(__file__))
vertex_key_path = filepath + "/vertex_key.json"
# Read the existing content of the file or create an empty dictionary
try:
with open(vertex_key_path, "r") as file:
# Read the file content
print("Read vertexai file path")
content = file.read()
# If the file is empty or not valid JSON, create an empty dictionary
if not content or not content.strip():
service_account_key_data = {}
else:
# Attempt to load the existing JSON content
file.seek(0)
service_account_key_data = json.load(file)
except FileNotFoundError:
# If the file doesn't exist, create an empty dictionary
service_account_key_data = {}
# Update the service_account_key_data with environment variables
private_key_id = os.environ.get("VERTEX_AI_PRIVATE_KEY_ID", "")
private_key = os.environ.get("VERTEX_AI_PRIVATE_KEY", "")
private_key = private_key.replace("\\n", "\n")
service_account_key_data["private_key_id"] = private_key_id
service_account_key_data["private_key"] = private_key
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
# Write the updated content to the temporary files
json.dump(service_account_key_data, temp_file, indent=2)
# Export the temporary file as GOOGLE_APPLICATION_CREDENTIALS
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.abspath(temp_file.name)
def test_aws_secret_manager():
import json
AWSSecretsManagerV2.load_aws_secret_manager(use_aws_secret_manager=True)
secret_val = get_secret("litellm_master_key")
print(f"secret_val: {secret_val}")
# cast json to dict
secret_val = json.loads(secret_val)
assert secret_val["litellm_master_key"] == "sk-1234"
def redact_oidc_signature(secret_val):
# remove the last part of `.` and replace it with "SIGNATURE_REMOVED"
return secret_val.split(".")[:-1] + ["SIGNATURE_REMOVED"]
@pytest.mark.skipif(
os.environ.get("K_SERVICE") is None,
reason="Cannot run without being in GCP Cloud Run",
)
def test_oidc_google():
secret_val = get_secret(
"oidc/google/https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-express-v1/invoke"
)
print(f"secret_val: {redact_oidc_signature(secret_val)}")
@pytest.mark.skipif(
os.environ.get("ACTIONS_ID_TOKEN_REQUEST_TOKEN") is None,
reason="Cannot run without being in GitHub Actions",
)
def test_oidc_github():
secret_val = get_secret(
"oidc/github/https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-express-v1/invoke"
)
print(f"secret_val: {redact_oidc_signature(secret_val)}")
@pytest.mark.skipif(
os.environ.get("CIRCLE_OIDC_TOKEN") is None,
reason="Cannot run without being in CircleCI Runner",
)
def test_oidc_circleci():
secret_val = get_secret("oidc/circleci/")
print(f"secret_val: {redact_oidc_signature(secret_val)}")
@pytest.mark.skipif(
os.environ.get("CIRCLE_OIDC_TOKEN_V2") is None,
reason="Cannot run without being in CircleCI Runner",
)
def test_oidc_circleci_v2():
secret_val = get_secret(
"oidc/circleci_v2/https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-express-v1/invoke"
)
print(f"secret_val: {redact_oidc_signature(secret_val)}")
@pytest.mark.skipif(
os.environ.get("CIRCLE_OIDC_TOKEN") is None,
reason="Cannot run without being in CircleCI Runner",
)
def test_oidc_circleci_with_azure():
# TODO: Switch to our own Azure account, currently using ai.moda's account
os.environ["AZURE_TENANT_ID"] = "17c0a27a-1246-4aa1-a3b6-d294e80e783c"
os.environ["AZURE_CLIENT_ID"] = "4faf5422-b2bd-45e8-a6d7-46543a38acd0"
azure_ad_token = get_azure_ad_token_from_oidc("oidc/circleci/")
print(f"secret_val: {redact_oidc_signature(azure_ad_token)}")
@pytest.mark.skipif(
os.environ.get("CIRCLE_OIDC_TOKEN") is None,
reason="Cannot run without being in CircleCI Runner",
)
def test_oidc_circle_v1_with_amazon():
# The purpose of this test is to get logs using the older v1 of the CircleCI OIDC token
# TODO: This is using ai.moda's IAM role, we should use LiteLLM's IAM role eventually
aws_role_name = "arn:aws:iam::335785316107:role/litellm-github-unit-tests-circleci-v1-assume-only"
aws_web_identity_token = "oidc/circleci/"
bllm = BedrockLLM()
creds = bllm.get_credentials(
aws_region_name="ca-west-1",
aws_web_identity_token=aws_web_identity_token,
aws_role_name=aws_role_name,
aws_session_name="assume-v1-session",
)
@pytest.mark.skipif(
os.environ.get("CIRCLE_OIDC_TOKEN") is None,
reason="Cannot run without being in CircleCI Runner",
)
def test_oidc_circle_v1_with_amazon_fips():
# The purpose of this test is to validate that we can assume a role in a FIPS region
# TODO: This is using ai.moda's IAM role, we should use LiteLLM's IAM role eventually
aws_role_name = "arn:aws:iam::335785316107:role/litellm-github-unit-tests-circleci-v1-assume-only"
aws_web_identity_token = "oidc/circleci/"
bllm = BedrockConverseLLM()
creds = bllm.get_credentials(
aws_region_name="us-west-1",
aws_web_identity_token=aws_web_identity_token,
aws_role_name=aws_role_name,
aws_session_name="assume-v1-session-fips",
aws_sts_endpoint="https://sts-fips.us-west-1.amazonaws.com",
)
def test_oidc_env_variable():
# Create a unique environment variable name
env_var_name = "OIDC_TEST_PATH_" + uuid4().hex
os.environ[env_var_name] = "secret-" + uuid4().hex
secret_val = get_secret(f"oidc/env/{env_var_name}")
print(f"secret_val: {redact_oidc_signature(secret_val)}")
assert secret_val == os.environ[env_var_name]
# now unset the environment variable
del os.environ[env_var_name]
def test_oidc_file():
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w+") as temp_file:
secret_value = "secret-" + uuid4().hex
temp_file.write(secret_value)
temp_file.flush()
temp_file_path = temp_file.name
secret_val = get_secret(f"oidc/file/{temp_file_path}")
print(f"secret_val: {redact_oidc_signature(secret_val)}")
assert secret_val == secret_value
def test_oidc_env_path():
# Create a temporary file
with tempfile.NamedTemporaryFile(mode="w+") as temp_file:
secret_value = "secret-" + uuid4().hex
temp_file.write(secret_value)
temp_file.flush()
temp_file_path = temp_file.name
# Create a unique environment variable name
env_var_name = "OIDC_TEST_PATH_" + uuid4().hex
# Set the environment variable to the temporary file path
os.environ[env_var_name] = temp_file_path
# Test getting the secret using the environment variable
secret_val = get_secret(f"oidc/env_path/{env_var_name}")
print(f"secret_val: {redact_oidc_signature(secret_val)}")
assert secret_val == secret_value
del os.environ[env_var_name]
@pytest.mark.flaky(retries=6, delay=1)
def test_google_secret_manager():
"""
Test that we can get a secret from Google Secret Manager
"""
os.environ["GOOGLE_SECRET_MANAGER_PROJECT_ID"] = "pathrise-convert-1606954137718"
from litellm.secret_managers.google_secret_manager import GoogleSecretManager
load_vertex_ai_credentials()
secret_manager = GoogleSecretManager()
secret_val = secret_manager.get_secret_from_google_secret_manager(
secret_name="OPENAI_API_KEY"
)
print("secret_val: {}".format(secret_val))
assert (
secret_val == "anything"
), "did not get expected secret value. expect 'anything', got '{}'".format(
secret_val
)
def test_google_secret_manager_read_in_memory():
"""
Test that Google Secret manager returs in memory value when it exists
"""
from litellm.secret_managers.google_secret_manager import GoogleSecretManager
load_vertex_ai_credentials()
os.environ["GOOGLE_SECRET_MANAGER_PROJECT_ID"] = "pathrise-convert-1606954137718"
secret_manager = GoogleSecretManager()
secret_manager.cache.cache_dict["UNIQUE_KEY"] = None
secret_manager.cache.cache_dict["UNIQUE_KEY_2"] = "lite-llm"
secret_val = secret_manager.get_secret_from_google_secret_manager(
secret_name="UNIQUE_KEY"
)
print("secret_val: {}".format(secret_val))
assert secret_val == None
secret_val = secret_manager.get_secret_from_google_secret_manager(
secret_name="UNIQUE_KEY_2"
)
print("secret_val: {}".format(secret_val))
assert secret_val == "lite-llm"
def test_should_read_secret_from_secret_manager():
"""
Test that _should_read_secret_from_secret_manager returns correct values based on access mode
"""
from litellm.proxy._types import KeyManagementSettings
# Test when secret manager client is None
litellm.secret_manager_client = None
litellm._key_management_settings = KeyManagementSettings()
assert _should_read_secret_from_secret_manager() is False
# Test with secret manager client and read_only access
litellm.secret_manager_client = "dummy_client"
litellm._key_management_settings = KeyManagementSettings(access_mode="read_only")
assert _should_read_secret_from_secret_manager() is True
# Test with secret manager client and read_and_write access
litellm._key_management_settings = KeyManagementSettings(
access_mode="read_and_write"
)
assert _should_read_secret_from_secret_manager() is True
# Test with secret manager client and write_only access
litellm._key_management_settings = KeyManagementSettings(access_mode="write_only")
assert _should_read_secret_from_secret_manager() is False
# Reset global variables
litellm.secret_manager_client = None
litellm._key_management_settings = KeyManagementSettings()
def test_get_secret_with_access_mode():
"""
Test that get_secret respects access mode settings
"""
from litellm.proxy._types import KeyManagementSettings
# Set up test environment
test_secret_name = "TEST_SECRET_KEY"
test_secret_value = "test_secret_value"
os.environ[test_secret_name] = test_secret_value
# Test with write_only access (should read from os.environ)
litellm.secret_manager_client = "dummy_client"
litellm._key_management_settings = KeyManagementSettings(access_mode="write_only")
assert get_secret(test_secret_name) == test_secret_value
# Test with no KeyManagementSettings but secret_manager_client set
litellm.secret_manager_client = "dummy_client"
litellm._key_management_settings = KeyManagementSettings()
assert _should_read_secret_from_secret_manager() is True
# Test with read_only access
litellm._key_management_settings = KeyManagementSettings(access_mode="read_only")
assert _should_read_secret_from_secret_manager() is True
# Test with read_and_write access
litellm._key_management_settings = KeyManagementSettings(
access_mode="read_and_write"
)
assert _should_read_secret_from_secret_manager() is True
# Reset global variables
litellm.secret_manager_client = None
litellm._key_management_settings = KeyManagementSettings()
del os.environ[test_secret_name]