mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 19:24:27 +00:00
Litellm dev 02 07 2025 p2 (#8377)
* fix(caching_routes.py): mask redis password on `/cache/ping` route * fix(caching_routes.py): fix linting erro * fix(caching_routes.py): fix linting error on caching routes * fix: fix test - ignore mask_dict - has a breakpoint * fix(azure.py): add timeout param + elapsed time in azure timeout error * fix(http_handler.py): add elapsed time to http timeout request makes it easier to debug how long request took before failing
This commit is contained in:
parent
ff2529a994
commit
7363b072c1
7 changed files with 126 additions and 25 deletions
81
litellm/litellm_core_utils/sensitive_data_masker.py
Normal file
81
litellm/litellm_core_utils/sensitive_data_masker.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
from typing import Any, Dict, Optional, Set
|
||||
|
||||
|
||||
class SensitiveDataMasker:
|
||||
def __init__(
|
||||
self,
|
||||
sensitive_patterns: Optional[Set[str]] = None,
|
||||
visible_prefix: int = 4,
|
||||
visible_suffix: int = 4,
|
||||
mask_char: str = "*",
|
||||
):
|
||||
self.sensitive_patterns = sensitive_patterns or {
|
||||
"password",
|
||||
"secret",
|
||||
"key",
|
||||
"token",
|
||||
"auth",
|
||||
"credential",
|
||||
"access",
|
||||
"private",
|
||||
"certificate",
|
||||
}
|
||||
|
||||
self.visible_prefix = visible_prefix
|
||||
self.visible_suffix = visible_suffix
|
||||
self.mask_char = mask_char
|
||||
|
||||
def _mask_value(self, value: str) -> str:
|
||||
if not value or len(str(value)) < (self.visible_prefix + self.visible_suffix):
|
||||
return value
|
||||
|
||||
value_str = str(value)
|
||||
masked_length = len(value_str) - (self.visible_prefix + self.visible_suffix)
|
||||
return f"{value_str[:self.visible_prefix]}{self.mask_char * masked_length}{value_str[-self.visible_suffix:]}"
|
||||
|
||||
def is_sensitive_key(self, key: str) -> bool:
|
||||
key_lower = str(key).lower()
|
||||
result = any(pattern in key_lower for pattern in self.sensitive_patterns)
|
||||
return result
|
||||
|
||||
def mask_dict(
|
||||
self, data: Dict[str, Any], depth: int = 0, max_depth: int = 10
|
||||
) -> Dict[str, Any]:
|
||||
if depth >= max_depth:
|
||||
return data
|
||||
|
||||
masked_data: Dict[str, Any] = {}
|
||||
for k, v in data.items():
|
||||
try:
|
||||
if isinstance(v, dict):
|
||||
masked_data[k] = self.mask_dict(v, depth + 1)
|
||||
elif hasattr(v, "__dict__") and not isinstance(v, type):
|
||||
masked_data[k] = self.mask_dict(vars(v), depth + 1)
|
||||
elif self.is_sensitive_key(k):
|
||||
str_value = str(v) if v is not None else ""
|
||||
masked_data[k] = self._mask_value(str_value)
|
||||
else:
|
||||
masked_data[k] = (
|
||||
v if isinstance(v, (int, float, bool, str)) else str(v)
|
||||
)
|
||||
except Exception:
|
||||
masked_data[k] = "<unable to serialize>"
|
||||
|
||||
return masked_data
|
||||
|
||||
|
||||
# Usage example:
|
||||
"""
|
||||
masker = SensitiveDataMasker()
|
||||
data = {
|
||||
"api_key": "sk-1234567890abcdef",
|
||||
"redis_password": "very_secret_pass",
|
||||
"port": 6379
|
||||
}
|
||||
masked = masker.mask_dict(data)
|
||||
# Result: {
|
||||
# "api_key": "sk-1****cdef",
|
||||
# "redis_password": "very****pass",
|
||||
# "port": 6379
|
||||
# }
|
||||
"""
|
Loading…
Add table
Add a link
Reference in a new issue