diff --git a/enterprise/enterprise_hooks/secret_detection.py b/enterprise/enterprise_hooks/secret_detection.py new file mode 100644 index 000000000..75a578b2c --- /dev/null +++ b/enterprise/enterprise_hooks/secret_detection.py @@ -0,0 +1,164 @@ +# +-------------------------------------------------------------+ +# +# Use SecretDetection /moderations for your LLM calls +# +# +-------------------------------------------------------------+ +# Thank you users! We ❤️ you! - Krrish & Ishaan + +import sys, os + +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +from typing import Optional, Literal, Union +import litellm, traceback, sys, uuid +from litellm.caching import DualCache +from litellm.proxy._types import UserAPIKeyAuth +from litellm.integrations.custom_logger import CustomLogger +from fastapi import HTTPException +from litellm._logging import verbose_proxy_logger +from litellm.utils import ( + ModelResponse, + EmbeddingResponse, + ImageResponse, + StreamingChoices, +) +from datetime import datetime +import aiohttp, asyncio +from litellm._logging import verbose_proxy_logger +import tempfile +from litellm._logging import verbose_proxy_logger + + +litellm.set_verbose = True + + +class _ENTERPRISE_SecretDetection(CustomLogger): + def __init__(self): + pass + + def scan_message_for_secrets(self, message_content: str): + from detect_secrets import SecretsCollection + from detect_secrets.settings import default_settings + + temp_file = tempfile.NamedTemporaryFile(delete=False) + temp_file.write(message_content.encode("utf-8")) + temp_file.close() + + secrets = SecretsCollection() + with default_settings(): + secrets.scan_file(temp_file.name) + + os.remove(temp_file.name) + + detected_secrets = [] + for file in secrets.files: + for found_secret in secrets[file]: + if found_secret.secret_value is None: + continue + detected_secrets.append( + {"type": found_secret.type, "value": found_secret.secret_value} + ) + + return detected_secrets + + #### CALL HOOKS - proxy only #### + def async_pre_call_hook( + self, + user_api_key_dict: UserAPIKeyAuth, + cache: DualCache, + data: dict, + call_type: str, # "completion", "embeddings", "image_generation", "moderation" + ): + from detect_secrets import SecretsCollection + from detect_secrets.settings import default_settings + + if "messages" in data and isinstance(data["messages"], list): + for message in data["messages"]: + if "content" in message and isinstance(message["content"], str): + detected_secrets = self.scan_message_for_secrets(message["content"]) + + for secret in detected_secrets: + message["content"] = message["content"].replace( + secret["value"], "[REDACTED]" + ) + + if len(detected_secrets) > 0: + secret_types = [secret["type"] for secret in detected_secrets] + verbose_proxy_logger.warning( + f"Detected and redacted secrets in message: {secret_types}" + ) + + if "prompt" in data: + if isinstance(data["prompt"], str): + detected_secrets = self.scan_message_for_secrets(data["prompt"]) + for secret in detected_secrets: + data["prompt"] = data["prompt"].replace( + secret["value"], "[REDACTED]" + ) + if len(detected_secrets) > 0: + secret_types = [secret["type"] for secret in detected_secrets] + verbose_proxy_logger.warning( + f"Detected and redacted secrets in prompt: {secret_types}" + ) + elif isinstance(data["prompt"], list): + for item in data["prompt"]: + if isinstance(item, str): + detected_secrets = self.scan_message_for_secrets(item) + for secret in detected_secrets: + item = item.replace(secret["value"], "[REDACTED]") + if len(detected_secrets) > 0: + secret_types = [ + secret["type"] for secret in detected_secrets + ] + verbose_proxy_logger.warning( + f"Detected and redacted secrets in prompt: {secret_types}" + ) + + if "input" in data: + if isinstance(data["input"], str): + detected_secrets = self.scan_message_for_secrets(data["input"]) + for secret in detected_secrets: + data["input"] = data["input"].replace(secret["value"], "[REDACTED]") + if len(detected_secrets) > 0: + secret_types = [secret["type"] for secret in detected_secrets] + verbose_proxy_logger.warning( + f"Detected and redacted secrets in input: {secret_types}" + ) + elif isinstance(data["input"], list): + for item in data["input"]: + if isinstance(item, str): + detected_secrets = self.scan_message_for_secrets(item) + for secret in detected_secrets: + item = item.replace(secret["value"], "[REDACTED]") + if len(detected_secrets) > 0: + secret_types = [ + secret["type"] for secret in detected_secrets + ] + verbose_proxy_logger.warning( + f"Detected and redacted secrets in input: {secret_types}" + ) + + +# secretDetect = _ENTERPRISE_SecretDetection() + +# from litellm.caching import DualCache +# print("running hook to detect a secret") +# test_data = { +# "messages": [ +# {"role": "user", "content": "Hey, how's it going, API_KEY = 'sk_1234567890abcdef'"}, +# {"role": "assistant", "content": "Hello! I'm doing well. How can I assist you today?"}, +# {"role": "user", "content": "this is my OPENAI_API_KEY = 'sk_1234567890abcdef'"}, +# {"role": "user", "content": "i think it is sk-1234567890abcdef"}, +# ], +# "model": "gpt-3.5-turbo", +# } +# secretDetect.async_pre_call_hook( +# data=test_data, +# user_api_key_dict=UserAPIKeyAuth(token="your_api_key"), +# cache=DualCache(), +# call_type="completion", +# ) + + +# print("finished hook to detect a secret - test data=", test_data)