mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
* build(pyproject.toml): add new dev dependencies - for type checking * build: reformat files to fit black * ci: reformat to fit black * ci(test-litellm.yml): make tests run clear * build(pyproject.toml): add ruff * fix: fix ruff checks * build(mypy/): fix mypy linting errors * fix(hashicorp_secret_manager.py): fix passing cert for tls auth * build(mypy/): resolve all mypy errors * test: update test * fix: fix black formatting * build(pre-commit-config.yaml): use poetry run black * fix(proxy_server.py): fix linting error * fix: fix ruff safe representation error
102 lines
2.8 KiB
Python
102 lines
2.8 KiB
Python
import base64
|
|
import os
|
|
from typing import Optional
|
|
|
|
from litellm._logging import verbose_proxy_logger
|
|
|
|
|
|
def _get_salt_key():
|
|
from litellm.proxy.proxy_server import master_key
|
|
|
|
salt_key = os.getenv("LITELLM_SALT_KEY", None)
|
|
|
|
if salt_key is None:
|
|
verbose_proxy_logger.debug(
|
|
"LITELLM_SALT_KEY is None using master_key to encrypt/decrypt secrets stored in DB"
|
|
)
|
|
|
|
salt_key = master_key
|
|
|
|
return salt_key
|
|
|
|
|
|
def encrypt_value_helper(value: str, new_encryption_key: Optional[str] = None):
|
|
signing_key = new_encryption_key or _get_salt_key()
|
|
|
|
try:
|
|
if isinstance(value, str):
|
|
encrypted_value = encrypt_value(value=value, signing_key=signing_key) # type: ignore
|
|
encrypted_value = base64.b64encode(encrypted_value).decode("utf-8")
|
|
|
|
return encrypted_value
|
|
|
|
verbose_proxy_logger.debug(
|
|
f"Invalid value type passed to encrypt_value: {type(value)} for Value: {value}\n Value must be a string"
|
|
)
|
|
# if it's not a string - do not encrypt it and return the value
|
|
return value
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def decrypt_value_helper(value: str):
|
|
signing_key = _get_salt_key()
|
|
|
|
try:
|
|
if isinstance(value, str):
|
|
decoded_b64 = base64.b64decode(value)
|
|
value = decrypt_value(value=decoded_b64, signing_key=signing_key) # type: ignore
|
|
return value
|
|
|
|
# if it's not str - do not decrypt it, return the value
|
|
return value
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
traceback.print_stack()
|
|
verbose_proxy_logger.error(
|
|
f"Error decrypting value, Did your master_key/salt key change recently? \nError: {str(e)}\nSet permanent salt key - https://docs.litellm.ai/docs/proxy/prod#5-set-litellm-salt-key"
|
|
)
|
|
# [Non-Blocking Exception. - this should not block decrypting other values]
|
|
pass
|
|
|
|
|
|
def encrypt_value(value: str, signing_key: str):
|
|
import hashlib
|
|
|
|
import nacl.secret
|
|
import nacl.utils
|
|
|
|
# get 32 byte master key #
|
|
hash_object = hashlib.sha256(signing_key.encode())
|
|
hash_bytes = hash_object.digest()
|
|
|
|
# initialize secret box #
|
|
box = nacl.secret.SecretBox(hash_bytes)
|
|
|
|
# encode message #
|
|
value_bytes = value.encode("utf-8")
|
|
|
|
encrypted = box.encrypt(value_bytes)
|
|
|
|
return encrypted
|
|
|
|
|
|
def decrypt_value(value: bytes, signing_key: str) -> str:
|
|
import hashlib
|
|
|
|
import nacl.secret
|
|
import nacl.utils
|
|
|
|
# get 32 byte master key #
|
|
hash_object = hashlib.sha256(signing_key.encode())
|
|
hash_bytes = hash_object.digest()
|
|
|
|
# initialize secret box #
|
|
box = nacl.secret.SecretBox(hash_bytes)
|
|
|
|
# Convert the bytes object to a string
|
|
plaintext = box.decrypt(value)
|
|
|
|
plaintext = plaintext.decode("utf-8") # type: ignore
|
|
return plaintext # type: ignore
|