forked from phoenix/litellm-mirror
test_get_secret_with_access_mode
This commit is contained in:
parent
3a072cd75a
commit
3a4e3e6ac0
3 changed files with 81 additions and 8 deletions
|
@ -10,4 +10,5 @@ model_list:
|
|||
general_settings:
|
||||
key_management_system: "aws_secret_manager"
|
||||
key_management_settings:
|
||||
store_virtual_keys: true
|
||||
store_virtual_keys: true
|
||||
access_mode: "write_only"
|
||||
|
|
|
@ -335,9 +335,10 @@ def _should_read_secret_from_secret_manager() -> bool:
|
|||
- Otherwise, return False
|
||||
"""
|
||||
if litellm.secret_manager_client is not None:
|
||||
if (
|
||||
litellm._key_management_settings.access_mode == "read_only"
|
||||
or litellm._key_management_settings.access_mode == "read_and_write"
|
||||
):
|
||||
return True
|
||||
if litellm._key_management_settings is not None:
|
||||
if (
|
||||
litellm._key_management_settings.access_mode == "read_only"
|
||||
or litellm._key_management_settings.access_mode == "read_and_write"
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -15,11 +15,14 @@ sys.path.insert(
|
|||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
import pytest
|
||||
|
||||
import litellm
|
||||
from litellm.llms.AzureOpenAI.azure import get_azure_ad_token_from_oidc
|
||||
from litellm.llms.bedrock.chat import BedrockConverseLLM, BedrockLLM
|
||||
from litellm.secret_managers.aws_secret_manager_v2 import AWSSecretsManagerV2
|
||||
from litellm.secret_managers.main import get_secret
|
||||
from litellm.secret_managers.main import (
|
||||
get_secret,
|
||||
_should_read_secret_from_secret_manager,
|
||||
)
|
||||
|
||||
|
||||
def test_aws_secret_manager():
|
||||
|
@ -244,3 +247,71 @@ def test_google_secret_manager_read_in_memory():
|
|||
)
|
||||
print("secret_val: {}".format(secret_val))
|
||||
assert secret_val == "lite-llm"
|
||||
|
||||
|
||||
def test_should_read_secret_from_secret_manager():
|
||||
"""
|
||||
Test that _should_read_secret_from_secret_manager returns correct values based on access mode
|
||||
"""
|
||||
from litellm.proxy._types import KeyManagementSettings
|
||||
|
||||
# Test when secret manager client is None
|
||||
litellm.secret_manager_client = None
|
||||
litellm._key_management_settings = KeyManagementSettings()
|
||||
assert _should_read_secret_from_secret_manager() is False
|
||||
|
||||
# Test with secret manager client and read_only access
|
||||
litellm.secret_manager_client = "dummy_client"
|
||||
litellm._key_management_settings = KeyManagementSettings(access_mode="read_only")
|
||||
assert _should_read_secret_from_secret_manager() is True
|
||||
|
||||
# Test with secret manager client and read_and_write access
|
||||
litellm._key_management_settings = KeyManagementSettings(
|
||||
access_mode="read_and_write"
|
||||
)
|
||||
assert _should_read_secret_from_secret_manager() is True
|
||||
|
||||
# Test with secret manager client and write_only access
|
||||
litellm._key_management_settings = KeyManagementSettings(access_mode="write_only")
|
||||
assert _should_read_secret_from_secret_manager() is False
|
||||
|
||||
# Reset global variables
|
||||
litellm.secret_manager_client = None
|
||||
litellm._key_management_settings = KeyManagementSettings()
|
||||
|
||||
|
||||
def test_get_secret_with_access_mode():
|
||||
"""
|
||||
Test that get_secret respects access mode settings
|
||||
"""
|
||||
from litellm.proxy._types import KeyManagementSettings
|
||||
|
||||
# Set up test environment
|
||||
test_secret_name = "TEST_SECRET_KEY"
|
||||
test_secret_value = "test_secret_value"
|
||||
os.environ[test_secret_name] = test_secret_value
|
||||
|
||||
# Test with write_only access (should read from os.environ)
|
||||
litellm.secret_manager_client = "dummy_client"
|
||||
litellm._key_management_settings = KeyManagementSettings(access_mode="write_only")
|
||||
assert get_secret(test_secret_name) == test_secret_value
|
||||
|
||||
# Test with no KeyManagementSettings but secret_manager_client set
|
||||
litellm.secret_manager_client = "dummy_client"
|
||||
litellm._key_management_settings = KeyManagementSettings()
|
||||
assert _should_read_secret_from_secret_manager() is True
|
||||
|
||||
# Test with read_only access
|
||||
litellm._key_management_settings = KeyManagementSettings(access_mode="read_only")
|
||||
assert _should_read_secret_from_secret_manager() is True
|
||||
|
||||
# Test with read_and_write access
|
||||
litellm._key_management_settings = KeyManagementSettings(
|
||||
access_mode="read_and_write"
|
||||
)
|
||||
assert _should_read_secret_from_secret_manager() is True
|
||||
|
||||
# Reset global variables
|
||||
litellm.secret_manager_client = None
|
||||
litellm._key_management_settings = KeyManagementSettings()
|
||||
del os.environ[test_secret_name]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue