forked from phoenix/litellm-mirror
(oidc): Add support for loading tokens via a file, environment variable, and from a file path set in an env var.
This commit is contained in:
parent
9c3124c5a7
commit
11668c31c1
2 changed files with 80 additions and 0 deletions
|
@ -5,6 +5,8 @@ from dotenv import load_dotenv
|
|||
|
||||
load_dotenv()
|
||||
import os
|
||||
from uuid import uuid4
|
||||
import tempfile
|
||||
|
||||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
|
@ -135,3 +137,62 @@ def test_oidc_circle_v1_with_amazon_fips():
|
|||
aws_session_name="assume-v1-session-fips",
|
||||
aws_sts_endpoint="https://sts-fips.us-west-1.amazonaws.com",
|
||||
)
|
||||
|
||||
|
||||
def test_oidc_env_variable():
|
||||
# Create a unique environment variable name
|
||||
env_var_name = "OIDC_TEST_PATH_" + uuid4().hex
|
||||
os.environ[env_var_name] = "secret-" + uuid4().hex
|
||||
secret_val = get_secret(
|
||||
f"oidc/env/{env_var_name}"
|
||||
)
|
||||
|
||||
print(f"secret_val: {redact_oidc_signature(secret_val)}")
|
||||
|
||||
assert secret_val == os.environ[env_var_name]
|
||||
|
||||
# now unset the environment variable
|
||||
del os.environ[env_var_name]
|
||||
|
||||
|
||||
def test_oidc_file():
|
||||
# Create a temporary file
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as temp_file:
|
||||
secret_value = "secret-" + uuid4().hex
|
||||
temp_file.write(secret_value)
|
||||
temp_file.flush()
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
secret_val = get_secret(
|
||||
f"oidc/file/{temp_file_path}"
|
||||
)
|
||||
|
||||
print(f"secret_val: {redact_oidc_signature(secret_val)}")
|
||||
|
||||
assert secret_val == secret_value
|
||||
|
||||
|
||||
def test_oidc_env_path():
|
||||
# Create a temporary file
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as temp_file:
|
||||
secret_value = "secret-" + uuid4().hex
|
||||
temp_file.write(secret_value)
|
||||
temp_file.flush()
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
# Create a unique environment variable name
|
||||
env_var_name = "OIDC_TEST_PATH_" + uuid4().hex
|
||||
|
||||
# Set the environment variable to the temporary file path
|
||||
os.environ[env_var_name] = temp_file_path
|
||||
|
||||
# Test getting the secret using the environment variable
|
||||
secret_val = get_secret(
|
||||
f"oidc/env_path/{env_var_name}"
|
||||
)
|
||||
|
||||
print(f"secret_val: {redact_oidc_signature(secret_val)}")
|
||||
|
||||
assert secret_val == secret_value
|
||||
|
||||
del os.environ[env_var_name]
|
||||
|
|
|
@ -8433,6 +8433,25 @@ def get_secret(
|
|||
with open(azure_federated_token_file, "r") as f:
|
||||
oidc_token = f.read()
|
||||
return oidc_token
|
||||
elif oidc_provider == "file":
|
||||
# Load token from a file
|
||||
with open(oidc_aud, "r") as f:
|
||||
oidc_token = f.read()
|
||||
return oidc_token
|
||||
elif oidc_provider == "env":
|
||||
# Load token directly from an environment variable
|
||||
oidc_token = os.getenv(oidc_aud)
|
||||
if oidc_token is None:
|
||||
raise ValueError(f"Environment variable {oidc_aud} not found")
|
||||
return oidc_token
|
||||
elif oidc_provider == "env_path":
|
||||
# Load token from a file path specified in an environment variable
|
||||
token_file_path = os.getenv(oidc_aud)
|
||||
if token_file_path is None:
|
||||
raise ValueError(f"Environment variable {oidc_aud} not found")
|
||||
with open(token_file_path, "r") as f:
|
||||
oidc_token = f.read()
|
||||
return oidc_token
|
||||
else:
|
||||
raise ValueError("Unsupported OIDC provider")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue