forked from phoenix/litellm-mirror
Merge pull request #4669 from BerriAI/litellm_logging_only_masking
Flag for PII masking on Logging only
This commit is contained in:
commit
d72bcdbce3
11 changed files with 379 additions and 81 deletions
|
@ -179,4 +179,60 @@ chat_completion = client.chat.completions.create(
|
|||
},
|
||||
"_response_ms": 1753.426
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Turn on for logging only
|
||||
|
||||
Only apply PII Masking before logging to Langfuse, etc.
|
||||
|
||||
Not on the actual llm api request / response.
|
||||
|
||||
:::note
|
||||
This is currently only applied for
|
||||
- `/chat/completion` requests
|
||||
- on 'success' logging
|
||||
|
||||
:::
|
||||
|
||||
1. Setup config.yaml
|
||||
```yaml
|
||||
litellm_settings:
|
||||
presidio_logging_only: true
|
||||
|
||||
model_list:
|
||||
- model_name: gpt-3.5-turbo
|
||||
litellm_params:
|
||||
model: gpt-3.5-turbo
|
||||
api_key: os.environ/OPENAI_API_KEY
|
||||
```
|
||||
|
||||
2. Start proxy
|
||||
|
||||
```bash
|
||||
litellm --config /path/to/config.yaml
|
||||
```
|
||||
|
||||
3. Test it!
|
||||
|
||||
```bash
|
||||
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
|
||||
-H 'Content-Type: application/json' \
|
||||
-H 'Authorization: Bearer sk-1234' \
|
||||
-D '{
|
||||
"model": "gpt-3.5-turbo",
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Hi, my name is Jane!"
|
||||
}
|
||||
]
|
||||
}'
|
||||
```
|
||||
|
||||
|
||||
**Expected Logged Response**
|
||||
|
||||
```
|
||||
Hi, my name is <PERSON>!
|
||||
```
|
|
@ -16,7 +16,7 @@ from litellm._logging import (
|
|||
log_level,
|
||||
)
|
||||
|
||||
|
||||
from litellm.types.guardrails import GuardrailItem
|
||||
from litellm.proxy._types import (
|
||||
KeyManagementSystem,
|
||||
KeyManagementSettings,
|
||||
|
@ -124,6 +124,7 @@ llamaguard_unsafe_content_categories: Optional[str] = None
|
|||
blocked_user_list: Optional[Union[str, List]] = None
|
||||
banned_keywords_list: Optional[Union[str, List]] = None
|
||||
llm_guard_mode: Literal["all", "key-specific", "request-specific"] = "all"
|
||||
guardrail_name_config_map: Optional[Dict[str, GuardrailItem]] = None
|
||||
##################
|
||||
### PREVIEW FEATURES ###
|
||||
enable_preview_features: bool = False
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# On success, logs events to Promptlayer
|
||||
import os
|
||||
import traceback
|
||||
from typing import Literal, Optional, Union
|
||||
from typing import Any, Literal, Optional, Tuple, Union
|
||||
|
||||
import dotenv
|
||||
from pydantic import BaseModel
|
||||
|
@ -117,6 +117,18 @@ class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callbac
|
|||
):
|
||||
pass
|
||||
|
||||
async def async_logging_hook(
|
||||
self, kwargs: dict, result: Any, call_type: str
|
||||
) -> Tuple[dict, Any]:
|
||||
"""For masking logged request/response. Return a modified version of the request/result."""
|
||||
return kwargs, result
|
||||
|
||||
def logging_hook(
|
||||
self, kwargs: dict, result: Any, call_type: str
|
||||
) -> Tuple[dict, Any]:
|
||||
"""For masking logged request/response. Return a modified version of the request/result."""
|
||||
return kwargs, result
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
|
|
|
@ -655,6 +655,16 @@ class Logging:
|
|||
result=result, litellm_logging_obj=self
|
||||
)
|
||||
|
||||
## LOGGING HOOK ##
|
||||
|
||||
for callback in callbacks:
|
||||
if isinstance(callback, CustomLogger):
|
||||
self.model_call_details["input"], result = callback.logging_hook(
|
||||
kwargs=self.model_call_details,
|
||||
result=result,
|
||||
call_type=self.call_type,
|
||||
)
|
||||
|
||||
for callback in callbacks:
|
||||
try:
|
||||
litellm_params = self.model_call_details.get("litellm_params", {})
|
||||
|
@ -1302,6 +1312,18 @@ class Logging:
|
|||
result=result, litellm_logging_obj=self
|
||||
)
|
||||
|
||||
## LOGGING HOOK ##
|
||||
|
||||
for callback in callbacks:
|
||||
if isinstance(callback, CustomLogger):
|
||||
self.model_call_details["input"], result = (
|
||||
await callback.async_logging_hook(
|
||||
kwargs=self.model_call_details,
|
||||
result=result,
|
||||
call_type=self.call_type,
|
||||
)
|
||||
)
|
||||
|
||||
for callback in callbacks:
|
||||
# check if callback can run for this request
|
||||
litellm_params = self.model_call_details.get("litellm_params", {})
|
||||
|
|
|
@ -46,7 +46,17 @@ def initialize_callbacks_on_proxy(
|
|||
_OPTIONAL_PresidioPIIMasking,
|
||||
)
|
||||
|
||||
pii_masking_object = _OPTIONAL_PresidioPIIMasking()
|
||||
presidio_logging_only: Optional[bool] = litellm_settings.get(
|
||||
"presidio_logging_only", None
|
||||
)
|
||||
if presidio_logging_only is not None:
|
||||
presidio_logging_only = bool(
|
||||
presidio_logging_only
|
||||
) # validate boolean given
|
||||
|
||||
pii_masking_object = _OPTIONAL_PresidioPIIMasking(
|
||||
logging_only=presidio_logging_only
|
||||
)
|
||||
imported_list.append(pii_masking_object)
|
||||
elif isinstance(callback, str) and callback == "llamaguard_moderations":
|
||||
from enterprise.enterprise_hooks.llama_guard import (
|
||||
|
|
|
@ -18,7 +18,7 @@ def initialize_guardrails(
|
|||
premium_user: bool,
|
||||
config_file_path: str,
|
||||
litellm_settings: dict,
|
||||
):
|
||||
) -> Dict[str, GuardrailItem]:
|
||||
try:
|
||||
verbose_proxy_logger.debug(f"validating guardrails passed {guardrails_config}")
|
||||
global all_guardrails
|
||||
|
@ -55,7 +55,11 @@ def initialize_guardrails(
|
|||
litellm_settings=litellm_settings,
|
||||
)
|
||||
|
||||
return guardrail_name_config_map
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.error(f"error initializing guardrails {str(e)}")
|
||||
traceback.print_exc()
|
||||
verbose_proxy_logger.error(
|
||||
"error initializing guardrails {}\n{}".format(
|
||||
str(e), traceback.format_exc()
|
||||
)
|
||||
)
|
||||
raise e
|
||||
|
|
|
@ -12,7 +12,7 @@ import asyncio
|
|||
import json
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import Optional, Union
|
||||
from typing import Any, List, Optional, Tuple, Union
|
||||
|
||||
import aiohttp
|
||||
from fastapi import HTTPException
|
||||
|
@ -27,6 +27,7 @@ from litellm.utils import (
|
|||
ImageResponse,
|
||||
ModelResponse,
|
||||
StreamingChoices,
|
||||
get_formatted_prompt,
|
||||
)
|
||||
|
||||
|
||||
|
@ -36,14 +37,18 @@ class _OPTIONAL_PresidioPIIMasking(CustomLogger):
|
|||
|
||||
# Class variables or attributes
|
||||
def __init__(
|
||||
self, mock_testing: bool = False, mock_redacted_text: Optional[dict] = None
|
||||
self,
|
||||
logging_only: Optional[bool] = None,
|
||||
mock_testing: bool = False,
|
||||
mock_redacted_text: Optional[dict] = None,
|
||||
):
|
||||
self.pii_tokens: dict = (
|
||||
{}
|
||||
) # mapping of PII token to original text - only used with Presidio `replace` operation
|
||||
|
||||
self.mock_redacted_text = mock_redacted_text
|
||||
if mock_testing == True: # for testing purposes only
|
||||
self.logging_only = logging_only
|
||||
if mock_testing is True: # for testing purposes only
|
||||
return
|
||||
|
||||
ad_hoc_recognizers = litellm.presidio_ad_hoc_recognizers
|
||||
|
@ -188,6 +193,10 @@ class _OPTIONAL_PresidioPIIMasking(CustomLogger):
|
|||
For multiple messages in /chat/completions, we'll need to call them in parallel.
|
||||
"""
|
||||
try:
|
||||
if (
|
||||
self.logging_only is True
|
||||
): # only modify the logging obj data (done by async_logging_hook)
|
||||
return data
|
||||
permissions = user_api_key_dict.permissions
|
||||
output_parse_pii = permissions.get(
|
||||
"output_parse_pii", litellm.output_parse_pii
|
||||
|
@ -244,7 +253,7 @@ class _OPTIONAL_PresidioPIIMasking(CustomLogger):
|
|||
},
|
||||
)
|
||||
|
||||
if no_pii == True: # turn off pii masking
|
||||
if no_pii is True: # turn off pii masking
|
||||
return data
|
||||
|
||||
if call_type == "completion": # /chat/completions requests
|
||||
|
@ -274,6 +283,43 @@ class _OPTIONAL_PresidioPIIMasking(CustomLogger):
|
|||
)
|
||||
raise e
|
||||
|
||||
async def async_logging_hook(
|
||||
self, kwargs: dict, result: Any, call_type: str
|
||||
) -> Tuple[dict, Any]:
|
||||
"""
|
||||
Masks the input before logging to langfuse, datadog, etc.
|
||||
"""
|
||||
if (
|
||||
call_type == "completion" or call_type == "acompletion"
|
||||
): # /chat/completions requests
|
||||
messages: Optional[List] = kwargs.get("messages", None)
|
||||
tasks = []
|
||||
|
||||
if messages is None:
|
||||
return kwargs, result
|
||||
|
||||
for m in messages:
|
||||
text_str = ""
|
||||
if m["content"] is None:
|
||||
continue
|
||||
if isinstance(m["content"], str):
|
||||
text_str = m["content"]
|
||||
tasks.append(
|
||||
self.check_pii(text=text_str, output_parse_pii=False)
|
||||
) # need to pass separately b/c presidio has context window limits
|
||||
responses = await asyncio.gather(*tasks)
|
||||
for index, r in enumerate(responses):
|
||||
if isinstance(messages[index]["content"], str):
|
||||
messages[index][
|
||||
"content"
|
||||
] = r # replace content with redacted string
|
||||
verbose_proxy_logger.info(
|
||||
f"Presidio PII Masking: Redacted pii message: {messages}"
|
||||
)
|
||||
kwargs["messages"] = messages
|
||||
|
||||
return kwargs, responses
|
||||
|
||||
async def async_post_call_success_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
|
|
|
@ -1469,12 +1469,14 @@ class ProxyConfig:
|
|||
+ CommonProxyErrors.not_premium_user.value
|
||||
)
|
||||
|
||||
initialize_guardrails(
|
||||
guardrail_name_config_map = initialize_guardrails(
|
||||
guardrails_config=value,
|
||||
premium_user=premium_user,
|
||||
config_file_path=config_file_path,
|
||||
litellm_settings=litellm_settings,
|
||||
)
|
||||
|
||||
litellm.guardrail_name_config_map = guardrail_name_config_map
|
||||
elif key == "callbacks":
|
||||
|
||||
initialize_callbacks_on_proxy(
|
||||
|
|
|
@ -220,81 +220,86 @@ async def test_aarun_thread_litellm(sync_mode, provider, is_streaming):
|
|||
- Create thread
|
||||
- Create run w/ Assistants + Thread
|
||||
"""
|
||||
if sync_mode:
|
||||
assistants = litellm.get_assistants(custom_llm_provider=provider)
|
||||
else:
|
||||
assistants = await litellm.aget_assistants(custom_llm_provider=provider)
|
||||
import openai
|
||||
|
||||
## get the first assistant ###
|
||||
assistant_id = assistants.data[0].id
|
||||
|
||||
new_thread = test_create_thread_litellm(sync_mode=sync_mode, provider=provider)
|
||||
|
||||
if asyncio.iscoroutine(new_thread):
|
||||
_new_thread = await new_thread
|
||||
else:
|
||||
_new_thread = new_thread
|
||||
|
||||
thread_id = _new_thread.id
|
||||
|
||||
# add message to thread
|
||||
message: MessageData = {"role": "user", "content": "Hey, how's it going?"} # type: ignore
|
||||
|
||||
data = {"custom_llm_provider": provider, "thread_id": _new_thread.id, **message}
|
||||
|
||||
if sync_mode:
|
||||
added_message = litellm.add_message(**data)
|
||||
|
||||
if is_streaming:
|
||||
run = litellm.run_thread_stream(assistant_id=assistant_id, **data)
|
||||
with run as run:
|
||||
assert isinstance(run, AssistantEventHandler)
|
||||
print(run)
|
||||
run.until_done()
|
||||
try:
|
||||
if sync_mode:
|
||||
assistants = litellm.get_assistants(custom_llm_provider=provider)
|
||||
else:
|
||||
run = litellm.run_thread(
|
||||
assistant_id=assistant_id, stream=is_streaming, **data
|
||||
)
|
||||
if run.status == "completed":
|
||||
messages = litellm.get_messages(
|
||||
thread_id=_new_thread.id, custom_llm_provider=provider
|
||||
)
|
||||
assert isinstance(messages.data[0], Message)
|
||||
else:
|
||||
pytest.fail(
|
||||
"An unexpected error occurred when running the thread, {}".format(
|
||||
run
|
||||
)
|
||||
)
|
||||
assistants = await litellm.aget_assistants(custom_llm_provider=provider)
|
||||
|
||||
else:
|
||||
added_message = await litellm.a_add_message(**data)
|
||||
## get the first assistant ###
|
||||
assistant_id = assistants.data[0].id
|
||||
|
||||
if is_streaming:
|
||||
run = litellm.arun_thread_stream(assistant_id=assistant_id, **data)
|
||||
async with run as run:
|
||||
print(f"run: {run}")
|
||||
assert isinstance(
|
||||
run,
|
||||
AsyncAssistantEventHandler,
|
||||
)
|
||||
print(run)
|
||||
run.until_done()
|
||||
new_thread = test_create_thread_litellm(sync_mode=sync_mode, provider=provider)
|
||||
|
||||
if asyncio.iscoroutine(new_thread):
|
||||
_new_thread = await new_thread
|
||||
else:
|
||||
run = await litellm.arun_thread(
|
||||
custom_llm_provider=provider,
|
||||
thread_id=thread_id,
|
||||
assistant_id=assistant_id,
|
||||
)
|
||||
_new_thread = new_thread
|
||||
|
||||
if run.status == "completed":
|
||||
messages = await litellm.aget_messages(
|
||||
thread_id=_new_thread.id, custom_llm_provider=provider
|
||||
)
|
||||
assert isinstance(messages.data[0], Message)
|
||||
thread_id = _new_thread.id
|
||||
|
||||
# add message to thread
|
||||
message: MessageData = {"role": "user", "content": "Hey, how's it going?"} # type: ignore
|
||||
|
||||
data = {"custom_llm_provider": provider, "thread_id": _new_thread.id, **message}
|
||||
|
||||
if sync_mode:
|
||||
added_message = litellm.add_message(**data)
|
||||
|
||||
if is_streaming:
|
||||
run = litellm.run_thread_stream(assistant_id=assistant_id, **data)
|
||||
with run as run:
|
||||
assert isinstance(run, AssistantEventHandler)
|
||||
print(run)
|
||||
run.until_done()
|
||||
else:
|
||||
pytest.fail(
|
||||
"An unexpected error occurred when running the thread, {}".format(
|
||||
run
|
||||
)
|
||||
run = litellm.run_thread(
|
||||
assistant_id=assistant_id, stream=is_streaming, **data
|
||||
)
|
||||
if run.status == "completed":
|
||||
messages = litellm.get_messages(
|
||||
thread_id=_new_thread.id, custom_llm_provider=provider
|
||||
)
|
||||
assert isinstance(messages.data[0], Message)
|
||||
else:
|
||||
pytest.fail(
|
||||
"An unexpected error occurred when running the thread, {}".format(
|
||||
run
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
added_message = await litellm.a_add_message(**data)
|
||||
|
||||
if is_streaming:
|
||||
run = litellm.arun_thread_stream(assistant_id=assistant_id, **data)
|
||||
async with run as run:
|
||||
print(f"run: {run}")
|
||||
assert isinstance(
|
||||
run,
|
||||
AsyncAssistantEventHandler,
|
||||
)
|
||||
print(run)
|
||||
await run.until_done()
|
||||
else:
|
||||
run = await litellm.arun_thread(
|
||||
custom_llm_provider=provider,
|
||||
thread_id=thread_id,
|
||||
assistant_id=assistant_id,
|
||||
)
|
||||
|
||||
if run.status == "completed":
|
||||
messages = await litellm.aget_messages(
|
||||
thread_id=_new_thread.id, custom_llm_provider=provider
|
||||
)
|
||||
assert isinstance(messages.data[0], Message)
|
||||
else:
|
||||
pytest.fail(
|
||||
"An unexpected error occurred when running the thread, {}".format(
|
||||
run
|
||||
)
|
||||
)
|
||||
except openai.APIError as e:
|
||||
pass
|
||||
|
|
73
litellm/tests/test_guardrails_config.py
Normal file
73
litellm/tests/test_guardrails_config.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
# What is this?
|
||||
## Unit Tests for guardrails config
|
||||
import asyncio
|
||||
import inspect
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
import litellm.litellm_core_utils
|
||||
import litellm.litellm_core_utils.litellm_logging
|
||||
|
||||
sys.path.insert(0, os.path.abspath("../.."))
|
||||
from typing import Any, List, Literal, Optional, Tuple, Union
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import litellm
|
||||
from litellm import Cache, completion, embedding
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.types.utils import LiteLLMCommonStrings
|
||||
|
||||
|
||||
class CustomLoggingIntegration(CustomLogger):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def logging_hook(
|
||||
self, kwargs: dict, result: Any, call_type: str
|
||||
) -> Tuple[dict, Any]:
|
||||
input: Optional[Any] = kwargs.get("input", None)
|
||||
messages: Optional[List] = kwargs.get("messages", None)
|
||||
if call_type == "completion":
|
||||
# assume input is of type messages
|
||||
if input is not None and isinstance(input, list):
|
||||
input[0]["content"] = "Hey, my name is [NAME]."
|
||||
if messages is not None and isinstance(messages, List):
|
||||
messages[0]["content"] = "Hey, my name is [NAME]."
|
||||
|
||||
kwargs["input"] = input
|
||||
kwargs["messages"] = messages
|
||||
return kwargs, result
|
||||
|
||||
|
||||
def test_guardrail_masking_logging_only():
|
||||
"""
|
||||
Assert response is unmasked.
|
||||
|
||||
Assert logged response is masked.
|
||||
"""
|
||||
callback = CustomLoggingIntegration()
|
||||
|
||||
with patch.object(callback, "log_success_event", new=MagicMock()) as mock_call:
|
||||
litellm.callbacks = [callback]
|
||||
messages = [{"role": "user", "content": "Hey, my name is Peter."}]
|
||||
response = completion(
|
||||
model="gpt-3.5-turbo", messages=messages, mock_response="Hi Peter!"
|
||||
)
|
||||
|
||||
assert response.choices[0].message.content == "Hi Peter!" # type: ignore
|
||||
|
||||
mock_call.assert_called_once()
|
||||
|
||||
print(mock_call.call_args.kwargs["kwargs"]["messages"][0]["content"])
|
||||
|
||||
assert (
|
||||
mock_call.call_args.kwargs["kwargs"]["messages"][0]["content"]
|
||||
== "Hey, my name is [NAME]."
|
||||
)
|
|
@ -16,6 +16,8 @@ import os
|
|||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import litellm
|
||||
|
@ -196,3 +198,68 @@ async def test_presidio_pii_masking_input_b():
|
|||
|
||||
assert "<PERSON>" in new_data["messages"][0]["content"]
|
||||
assert "<PHONE_NUMBER>" not in new_data["messages"][0]["content"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_presidio_pii_masking_logging_output_only_no_pre_api_hook():
|
||||
pii_masking = _OPTIONAL_PresidioPIIMasking(
|
||||
logging_only=True,
|
||||
mock_testing=True,
|
||||
mock_redacted_text=input_b_anonymizer_results,
|
||||
)
|
||||
|
||||
_api_key = "sk-12345"
|
||||
user_api_key_dict = UserAPIKeyAuth(api_key=_api_key)
|
||||
local_cache = DualCache()
|
||||
|
||||
test_messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "My name is Jane Doe, who are you? Say my name in your response",
|
||||
}
|
||||
]
|
||||
|
||||
new_data = await pii_masking.async_pre_call_hook(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
cache=local_cache,
|
||||
data={"messages": test_messages},
|
||||
call_type="completion",
|
||||
)
|
||||
|
||||
assert "Jane Doe" in new_data["messages"][0]["content"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_presidio_pii_masking_logging_output_only_logged_response():
|
||||
pii_masking = _OPTIONAL_PresidioPIIMasking(
|
||||
logging_only=True,
|
||||
mock_testing=True,
|
||||
mock_redacted_text=input_b_anonymizer_results,
|
||||
)
|
||||
|
||||
test_messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "My name is Jane Doe, who are you? Say my name in your response",
|
||||
}
|
||||
]
|
||||
with patch.object(
|
||||
pii_masking, "async_log_success_event", new=AsyncMock()
|
||||
) as mock_call:
|
||||
litellm.callbacks = [pii_masking]
|
||||
response = await litellm.acompletion(
|
||||
model="gpt-3.5-turbo", messages=test_messages, mock_response="Hi Peter!"
|
||||
)
|
||||
|
||||
await asyncio.sleep(3)
|
||||
|
||||
assert response.choices[0].message.content == "Hi Peter!" # type: ignore
|
||||
|
||||
mock_call.assert_called_once()
|
||||
|
||||
print(mock_call.call_args.kwargs["kwargs"]["messages"][0]["content"])
|
||||
|
||||
assert (
|
||||
mock_call.call_args.kwargs["kwargs"]["messages"][0]["content"]
|
||||
== "My name is <PERSON>, who are you? Say my name in your response"
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue