From aa587d9fc8d8cd58dfa9df0cead72ab7cad86926 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 19 Apr 2025 10:00:37 -0700 Subject: [PATCH] fix(_logging.py): handle more cases of sensitive keys in logs --- litellm/_logging.py | 49 +++++++++++------- tests/litellm/test_logging.py | 97 +++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 19 deletions(-) diff --git a/litellm/_logging.py b/litellm/_logging.py index 41f2bc2bfd..8572d28f36 100644 --- a/litellm/_logging.py +++ b/litellm/_logging.py @@ -53,6 +53,7 @@ class SensitiveDataFilter(logging.Filter): "password", "secret", "token", + "private_key", # Added for nested JSON case ] def filter(self, record): @@ -65,29 +66,39 @@ class SensitiveDataFilter(logging.Filter): else: msg = str(record.msg) - key_pattern = r'["\']?([^"\':\s]+)["\']?\s*[:=]' - keys = re.findall(key_pattern, msg) - # Redact sensitive information - for key in keys: - # Check if any sensitive key is a substring of the current key - if any( - sensitive_key in key.lower() for sensitive_key in self.SENSITIVE_KEYS - ): - # Handle JSON-like strings - pattern = f'"{key}":\\s*"[^"]*"' - msg = re.sub(pattern, f'"{key}": "REDACTED"', msg) + for key in self.SENSITIVE_KEYS: + # Create patterns for compound keys (e.g., openai_api_key) + key_pattern = f"[a-zA-Z0-9_/\\\\-]*{key}[a-zA-Z0-9_/\\\\-]*" - # Handle key-value pairs in plain text - pattern = f"{key}\\s*=\\s*[^\\s,}}]+" - msg = re.sub(pattern, f"{key}=REDACTED", msg) + # Handle JSON-like strings with double quotes + json_pattern = f'"({key_pattern})":\\s*"[^"]*"' + msg = re.sub(json_pattern, r'"\1": "REDACTED"', msg, flags=re.IGNORECASE) - # Handle dictionary-like strings - pattern = f"'{key}':\\s*'[^']*'" - msg = re.sub(pattern, f"'{key}': 'REDACTED'", msg) + # Handle dictionary-like strings with single quotes + dict_pattern = f"'({key_pattern})':\\s*'[^']*'" + msg = re.sub(dict_pattern, r"'\1': 'REDACTED'", msg, flags=re.IGNORECASE) - pattern = f"\"{key}\":\\s*'[^']*'" - msg = re.sub(pattern, f"\"{key}\": 'REDACTED'", msg) + # Handle mixed quote styles + mixed_pattern = f"\"({key_pattern})\":\\s*'[^']*'" + msg = re.sub(mixed_pattern, r'"\1": \'REDACTED\'', msg, flags=re.IGNORECASE) + + # Handle key-value pairs in plain text + # Convert snake_case and special characters to flexible matching + display_key = key.replace("_", "[-_ ]") + # Match both original and display versions of the key, preserving the separator and spacing + plain_pattern = ( + f"\\b({key_pattern}|{display_key})\\s*([:=])\\s*[^,\\s][^,]*" + ) + msg = re.sub( + plain_pattern, + lambda m: f"{m.group(1)}{m.group(2)}{' ' if m.group(2) == ':' else ''}REDACTED", + msg, + flags=re.IGNORECASE, + ) + + # Handle mixed quotes without escaping + msg = msg.replace('\\"', '"').replace("\\'", "'") # Set the message and clear args since we've already formatted it record.msg = msg diff --git a/tests/litellm/test_logging.py b/tests/litellm/test_logging.py index 5e8eb1d10d..d151699d8e 100644 --- a/tests/litellm/test_logging.py +++ b/tests/litellm/test_logging.py @@ -97,3 +97,100 @@ def test_sensitive_data_filter_with_different_formats(): assert ( record.msg == test_case["expected"] ), f"Failed for input: {test_case['input']}" + + +def test_sensitive_data_filter_with_special_characters(): + # Create a test logger + logger = logging.getLogger("test_logger") + logger.setLevel(logging.INFO) + + # Create a filter + sensitive_filter = SensitiveDataFilter() + + # Test cases with special characters in keys + test_cases = [ + { + "input": '{"api_key": "sk-1234567890"}', + "expected": '{"api_key": "REDACTED"}', + }, + { + "input": '{"api-key": "sk-1234567890"}', + "expected": '{"api-key": "REDACTED"}', + }, + { + "input": '{"api/key": "sk-1234567890"}', + "expected": '{"api/key": "REDACTED"}', + }, + { + "input": '{"api\\key": "sk-1234567890"}', + "expected": '{"api\\key": "REDACTED"}', + }, + ] + + for test_case in test_cases: + # Create a log record + record = logging.LogRecord( + name="test_logger", + level=logging.INFO, + pathname="test.py", + lineno=1, + msg=test_case["input"], + args=(), + exc_info=None, + ) + + # Apply the filter + sensitive_filter.filter(record) + + # Verify the output + assert ( + record.msg == test_case["expected"] + ), f"Failed for input: {test_case['input']}" + + +def test_sensitive_data_filter_with_format_strings(): + # Create a test logger + logger = logging.getLogger("test_logger") + logger.setLevel(logging.INFO) + + # Create a filter + sensitive_filter = SensitiveDataFilter() + + # Test cases with format strings + test_cases = [ + { + "input": "API key: %s", + "args": ("sk-1234567890",), + "expected": "API key: REDACTED", + }, + { + "input": "Credentials: %s, Token: %s", + "args": ("secret123", "abc123"), + "expected": "Credentials: REDACTED, Token: REDACTED", + }, + { + "input": "API base: %s, Key: %s", + "args": ("https://api.example.com", "sk-1234567890"), + "expected": "API base: REDACTED, Key: REDACTED", + }, + ] + + for test_case in test_cases: + # Create a log record + record = logging.LogRecord( + name="test_logger", + level=logging.INFO, + pathname="test.py", + lineno=1, + msg=test_case["input"], + args=test_case["args"], + exc_info=None, + ) + + # Apply the filter + sensitive_filter.filter(record) + + # Verify the output + assert ( + record.msg == test_case["expected"] + ), f"Failed for input: {test_case['input']} with args: {test_case['args']}"