mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
fix(_logging.py): handle more cases of sensitive keys in logs
This commit is contained in:
parent
b461905745
commit
aa587d9fc8
2 changed files with 127 additions and 19 deletions
|
@ -53,6 +53,7 @@ class SensitiveDataFilter(logging.Filter):
|
|||
"password",
|
||||
"secret",
|
||||
"token",
|
||||
"private_key", # Added for nested JSON case
|
||||
]
|
||||
|
||||
def filter(self, record):
|
||||
|
@ -65,29 +66,39 @@ class SensitiveDataFilter(logging.Filter):
|
|||
else:
|
||||
msg = str(record.msg)
|
||||
|
||||
key_pattern = r'["\']?([^"\':\s]+)["\']?\s*[:=]'
|
||||
keys = re.findall(key_pattern, msg)
|
||||
|
||||
# Redact sensitive information
|
||||
for key in keys:
|
||||
# Check if any sensitive key is a substring of the current key
|
||||
if any(
|
||||
sensitive_key in key.lower() for sensitive_key in self.SENSITIVE_KEYS
|
||||
):
|
||||
# Handle JSON-like strings
|
||||
pattern = f'"{key}":\\s*"[^"]*"'
|
||||
msg = re.sub(pattern, f'"{key}": "REDACTED"', msg)
|
||||
for key in self.SENSITIVE_KEYS:
|
||||
# Create patterns for compound keys (e.g., openai_api_key)
|
||||
key_pattern = f"[a-zA-Z0-9_/\\\\-]*{key}[a-zA-Z0-9_/\\\\-]*"
|
||||
|
||||
# Handle JSON-like strings with double quotes
|
||||
json_pattern = f'"({key_pattern})":\\s*"[^"]*"'
|
||||
msg = re.sub(json_pattern, r'"\1": "REDACTED"', msg, flags=re.IGNORECASE)
|
||||
|
||||
# Handle dictionary-like strings with single quotes
|
||||
dict_pattern = f"'({key_pattern})':\\s*'[^']*'"
|
||||
msg = re.sub(dict_pattern, r"'\1': 'REDACTED'", msg, flags=re.IGNORECASE)
|
||||
|
||||
# Handle mixed quote styles
|
||||
mixed_pattern = f"\"({key_pattern})\":\\s*'[^']*'"
|
||||
msg = re.sub(mixed_pattern, r'"\1": \'REDACTED\'', msg, flags=re.IGNORECASE)
|
||||
|
||||
# Handle key-value pairs in plain text
|
||||
pattern = f"{key}\\s*=\\s*[^\\s,}}]+"
|
||||
msg = re.sub(pattern, f"{key}=REDACTED", msg)
|
||||
# Convert snake_case and special characters to flexible matching
|
||||
display_key = key.replace("_", "[-_ ]")
|
||||
# Match both original and display versions of the key, preserving the separator and spacing
|
||||
plain_pattern = (
|
||||
f"\\b({key_pattern}|{display_key})\\s*([:=])\\s*[^,\\s][^,]*"
|
||||
)
|
||||
msg = re.sub(
|
||||
plain_pattern,
|
||||
lambda m: f"{m.group(1)}{m.group(2)}{' ' if m.group(2) == ':' else ''}REDACTED",
|
||||
msg,
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Handle dictionary-like strings
|
||||
pattern = f"'{key}':\\s*'[^']*'"
|
||||
msg = re.sub(pattern, f"'{key}': 'REDACTED'", msg)
|
||||
|
||||
pattern = f"\"{key}\":\\s*'[^']*'"
|
||||
msg = re.sub(pattern, f"\"{key}\": 'REDACTED'", msg)
|
||||
# Handle mixed quotes without escaping
|
||||
msg = msg.replace('\\"', '"').replace("\\'", "'")
|
||||
|
||||
# Set the message and clear args since we've already formatted it
|
||||
record.msg = msg
|
||||
|
|
|
@ -97,3 +97,100 @@ def test_sensitive_data_filter_with_different_formats():
|
|||
assert (
|
||||
record.msg == test_case["expected"]
|
||||
), f"Failed for input: {test_case['input']}"
|
||||
|
||||
|
||||
def test_sensitive_data_filter_with_special_characters():
|
||||
# Create a test logger
|
||||
logger = logging.getLogger("test_logger")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Create a filter
|
||||
sensitive_filter = SensitiveDataFilter()
|
||||
|
||||
# Test cases with special characters in keys
|
||||
test_cases = [
|
||||
{
|
||||
"input": '{"api_key": "sk-1234567890"}',
|
||||
"expected": '{"api_key": "REDACTED"}',
|
||||
},
|
||||
{
|
||||
"input": '{"api-key": "sk-1234567890"}',
|
||||
"expected": '{"api-key": "REDACTED"}',
|
||||
},
|
||||
{
|
||||
"input": '{"api/key": "sk-1234567890"}',
|
||||
"expected": '{"api/key": "REDACTED"}',
|
||||
},
|
||||
{
|
||||
"input": '{"api\\key": "sk-1234567890"}',
|
||||
"expected": '{"api\\key": "REDACTED"}',
|
||||
},
|
||||
]
|
||||
|
||||
for test_case in test_cases:
|
||||
# Create a log record
|
||||
record = logging.LogRecord(
|
||||
name="test_logger",
|
||||
level=logging.INFO,
|
||||
pathname="test.py",
|
||||
lineno=1,
|
||||
msg=test_case["input"],
|
||||
args=(),
|
||||
exc_info=None,
|
||||
)
|
||||
|
||||
# Apply the filter
|
||||
sensitive_filter.filter(record)
|
||||
|
||||
# Verify the output
|
||||
assert (
|
||||
record.msg == test_case["expected"]
|
||||
), f"Failed for input: {test_case['input']}"
|
||||
|
||||
|
||||
def test_sensitive_data_filter_with_format_strings():
|
||||
# Create a test logger
|
||||
logger = logging.getLogger("test_logger")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Create a filter
|
||||
sensitive_filter = SensitiveDataFilter()
|
||||
|
||||
# Test cases with format strings
|
||||
test_cases = [
|
||||
{
|
||||
"input": "API key: %s",
|
||||
"args": ("sk-1234567890",),
|
||||
"expected": "API key: REDACTED",
|
||||
},
|
||||
{
|
||||
"input": "Credentials: %s, Token: %s",
|
||||
"args": ("secret123", "abc123"),
|
||||
"expected": "Credentials: REDACTED, Token: REDACTED",
|
||||
},
|
||||
{
|
||||
"input": "API base: %s, Key: %s",
|
||||
"args": ("https://api.example.com", "sk-1234567890"),
|
||||
"expected": "API base: REDACTED, Key: REDACTED",
|
||||
},
|
||||
]
|
||||
|
||||
for test_case in test_cases:
|
||||
# Create a log record
|
||||
record = logging.LogRecord(
|
||||
name="test_logger",
|
||||
level=logging.INFO,
|
||||
pathname="test.py",
|
||||
lineno=1,
|
||||
msg=test_case["input"],
|
||||
args=test_case["args"],
|
||||
exc_info=None,
|
||||
)
|
||||
|
||||
# Apply the filter
|
||||
sensitive_filter.filter(record)
|
||||
|
||||
# Verify the output
|
||||
assert (
|
||||
record.msg == test_case["expected"]
|
||||
), f"Failed for input: {test_case['input']} with args: {test_case['args']}"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue