forked from phoenix/litellm-mirror
Merge pull request #5603 from BerriAI/litellm_allow_turning_off_message_logging_for_callbacks
[Feat-Proxy] allow turning off message logging for OTEL (callback specific)
This commit is contained in:
commit
43cd657ac5
9 changed files with 138 additions and 42 deletions
|
@ -744,6 +744,20 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \
|
|||
|
||||
** 🎉 Expect to see this trace logged in your OTEL collector**
|
||||
|
||||
### Redacting Messages, Response Content from OTEL Logging
|
||||
|
||||
Set `message_logging=False` for `otel`, no messages / response will be logged
|
||||
|
||||
```yaml
|
||||
litellm_settings:
|
||||
callbacks: ["otel"]
|
||||
|
||||
## 👇 Key Change
|
||||
callback_settings:
|
||||
otel:
|
||||
message_logging: False
|
||||
```
|
||||
|
||||
### Context propagation across Services `Traceparent HTTP Header`
|
||||
|
||||
❓ Use this when you want to **pass information about the incoming request in a distributed tracing system**
|
||||
|
|
|
@ -15,7 +15,8 @@ from litellm.types.utils import AdapterCompletionStreamWrapper, ModelResponse
|
|||
|
||||
class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callback#callback-class
|
||||
# Class variables or attributes
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, message_logging: bool = True) -> None:
|
||||
self.message_logging = message_logging
|
||||
pass
|
||||
|
||||
def log_pre_api_call(self, model, messages, kwargs):
|
||||
|
|
|
@ -71,7 +71,10 @@ class OpenTelemetryConfig:
|
|||
|
||||
class OpenTelemetry(CustomLogger):
|
||||
def __init__(
|
||||
self, config=OpenTelemetryConfig.from_env(), callback_name: Optional[str] = None
|
||||
self,
|
||||
config=OpenTelemetryConfig.from_env(),
|
||||
callback_name: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
|
@ -101,6 +104,9 @@ class OpenTelemetry(CustomLogger):
|
|||
otel_exporter_logger = logging.getLogger("opentelemetry.sdk.trace.export")
|
||||
otel_exporter_logger.setLevel(logging.DEBUG)
|
||||
|
||||
# init CustomLogger params
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||
self._handle_sucess(kwargs, response_obj, start_time, end_time)
|
||||
|
||||
|
@ -261,6 +267,8 @@ class OpenTelemetry(CustomLogger):
|
|||
|
||||
if litellm.turn_off_message_logging is True:
|
||||
pass
|
||||
elif self.message_logging is not True:
|
||||
pass
|
||||
else:
|
||||
# Span 2: Raw Request / Response to LLM
|
||||
raw_request_span = self.tracer.start_span(
|
||||
|
|
|
@ -28,6 +28,7 @@ from litellm.cost_calculator import _select_model_name_for_cost_calc
|
|||
from litellm.integrations.custom_guardrail import CustomGuardrail
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.litellm_core_utils.redact_messages import (
|
||||
redact_message_input_output_from_custom_logger,
|
||||
redact_message_input_output_from_logging,
|
||||
)
|
||||
from litellm.rerank_api.types import RerankResponse
|
||||
|
@ -1395,6 +1396,9 @@ class Logging:
|
|||
call_type=self.call_type,
|
||||
)
|
||||
elif isinstance(callback, CustomLogger):
|
||||
result = redact_message_input_output_from_custom_logger(
|
||||
result=result, litellm_logging_obj=self, custom_logger=callback
|
||||
)
|
||||
self.model_call_details, result = await callback.async_logging_hook(
|
||||
kwargs=self.model_call_details,
|
||||
result=result,
|
||||
|
|
|
@ -11,6 +11,7 @@ import copy
|
|||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import litellm
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from litellm.litellm_core_utils.litellm_logging import (
|
||||
|
@ -22,6 +23,56 @@ else:
|
|||
LiteLLMLoggingObject = Any
|
||||
|
||||
|
||||
def redact_message_input_output_from_custom_logger(
|
||||
litellm_logging_obj: LiteLLMLoggingObject, result, custom_logger: CustomLogger
|
||||
):
|
||||
if (
|
||||
hasattr(custom_logger, "message_logging")
|
||||
and custom_logger.message_logging is not True
|
||||
):
|
||||
return perform_redaction(litellm_logging_obj, result)
|
||||
return result
|
||||
|
||||
|
||||
def perform_redaction(litellm_logging_obj: LiteLLMLoggingObject, result):
|
||||
"""
|
||||
Performs the actual redaction on the logging object and result.
|
||||
"""
|
||||
# Redact model_call_details
|
||||
litellm_logging_obj.model_call_details["messages"] = [
|
||||
{"role": "user", "content": "redacted-by-litellm"}
|
||||
]
|
||||
litellm_logging_obj.model_call_details["prompt"] = ""
|
||||
litellm_logging_obj.model_call_details["input"] = ""
|
||||
|
||||
# Redact streaming response
|
||||
if (
|
||||
litellm_logging_obj.stream is True
|
||||
and "complete_streaming_response" in litellm_logging_obj.model_call_details
|
||||
):
|
||||
_streaming_response = litellm_logging_obj.model_call_details[
|
||||
"complete_streaming_response"
|
||||
]
|
||||
for choice in _streaming_response.choices:
|
||||
if isinstance(choice, litellm.Choices):
|
||||
choice.message.content = "redacted-by-litellm"
|
||||
elif isinstance(choice, litellm.utils.StreamingChoices):
|
||||
choice.delta.content = "redacted-by-litellm"
|
||||
|
||||
# Redact result
|
||||
if result is not None and isinstance(result, litellm.ModelResponse):
|
||||
_result = copy.deepcopy(result)
|
||||
if hasattr(_result, "choices") and _result.choices is not None:
|
||||
for choice in _result.choices:
|
||||
if isinstance(choice, litellm.Choices):
|
||||
choice.message.content = "redacted-by-litellm"
|
||||
elif isinstance(choice, litellm.utils.StreamingChoices):
|
||||
choice.delta.content = "redacted-by-litellm"
|
||||
return _result
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def redact_message_input_output_from_logging(
|
||||
litellm_logging_obj: LiteLLMLoggingObject, result
|
||||
):
|
||||
|
@ -50,43 +101,7 @@ def redact_message_input_output_from_logging(
|
|||
):
|
||||
return result
|
||||
|
||||
# remove messages, prompts, input, response from logging
|
||||
litellm_logging_obj.model_call_details["messages"] = [
|
||||
{"role": "user", "content": "redacted-by-litellm"}
|
||||
]
|
||||
litellm_logging_obj.model_call_details["prompt"] = ""
|
||||
litellm_logging_obj.model_call_details["input"] = ""
|
||||
|
||||
# response cleaning
|
||||
# ChatCompletion Responses
|
||||
if (
|
||||
litellm_logging_obj.stream is True
|
||||
and "complete_streaming_response" in litellm_logging_obj.model_call_details
|
||||
):
|
||||
_streaming_response = litellm_logging_obj.model_call_details[
|
||||
"complete_streaming_response"
|
||||
]
|
||||
for choice in _streaming_response.choices:
|
||||
if isinstance(choice, litellm.Choices):
|
||||
choice.message.content = "redacted-by-litellm"
|
||||
elif isinstance(choice, litellm.utils.StreamingChoices):
|
||||
choice.delta.content = "redacted-by-litellm"
|
||||
else:
|
||||
if result is not None:
|
||||
if isinstance(result, litellm.ModelResponse):
|
||||
# only deep copy litellm.ModelResponse
|
||||
_result = copy.deepcopy(result)
|
||||
if hasattr(_result, "choices") and _result.choices is not None:
|
||||
for choice in _result.choices:
|
||||
if isinstance(choice, litellm.Choices):
|
||||
choice.message.content = "redacted-by-litellm"
|
||||
elif isinstance(choice, litellm.utils.StreamingChoices):
|
||||
choice.delta.content = "redacted-by-litellm"
|
||||
|
||||
return _result
|
||||
|
||||
# by default return result
|
||||
return result
|
||||
return perform_redaction(litellm_logging_obj, result)
|
||||
|
||||
|
||||
def redact_user_api_key_info(metadata: dict) -> dict:
|
||||
|
|
|
@ -17,7 +17,7 @@ def initialize_callbacks_on_proxy(
|
|||
litellm_settings: dict,
|
||||
callback_specific_params: dict = {},
|
||||
):
|
||||
from litellm.proxy.proxy_server import prisma_client
|
||||
from litellm.proxy.proxy_server import callback_settings, prisma_client
|
||||
|
||||
verbose_proxy_logger.debug(
|
||||
f"{blue_color_code}initializing callbacks={value} on proxy{reset_color_code}"
|
||||
|
@ -34,7 +34,11 @@ def initialize_callbacks_on_proxy(
|
|||
from litellm.integrations.opentelemetry import OpenTelemetry
|
||||
from litellm.proxy import proxy_server
|
||||
|
||||
open_telemetry_logger = OpenTelemetry()
|
||||
_otel_settings = {}
|
||||
if isinstance(callback_settings, dict) and "otel" in callback_settings:
|
||||
_otel_settings = callback_settings["otel"]
|
||||
|
||||
open_telemetry_logger = OpenTelemetry(**_otel_settings)
|
||||
|
||||
# Add Otel as a service callback
|
||||
if "otel" not in litellm.service_callback:
|
||||
|
|
|
@ -15,6 +15,13 @@ model_list:
|
|||
|
||||
|
||||
|
||||
litellm_settings:
|
||||
callbacks: ["otel"]
|
||||
|
||||
callback_settings:
|
||||
otel:
|
||||
message_logging: False
|
||||
|
||||
router_settings:
|
||||
enable_tag_filtering: True # 👈 Key Chang
|
||||
|
||||
|
|
|
@ -478,6 +478,7 @@ experimental = False
|
|||
llm_router: Optional[litellm.Router] = None
|
||||
llm_model_list: Optional[list] = None
|
||||
general_settings: dict = {}
|
||||
callback_settings: dict = {}
|
||||
log_file = "api_log.json"
|
||||
worker_config = None
|
||||
master_key = None
|
||||
|
@ -1491,7 +1492,7 @@ class ProxyConfig:
|
|||
"""
|
||||
Load config values into proxy global state
|
||||
"""
|
||||
global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash, proxy_batch_write_at, disable_spend_logs, prompt_injection_detection_obj, redis_usage_cache, store_model_in_db, premium_user, open_telemetry_logger, health_check_details
|
||||
global master_key, user_config_file_path, otel_logging, user_custom_auth, user_custom_auth_path, user_custom_key_generate, use_background_health_checks, health_check_interval, use_queue, custom_db_client, proxy_budget_rescheduler_max_time, proxy_budget_rescheduler_min_time, ui_access_mode, litellm_master_key_hash, proxy_batch_write_at, disable_spend_logs, prompt_injection_detection_obj, redis_usage_cache, store_model_in_db, premium_user, open_telemetry_logger, health_check_details, callback_settings
|
||||
|
||||
# Load existing config
|
||||
if os.environ.get("LITELLM_CONFIG_BUCKET_NAME") is not None:
|
||||
|
@ -1533,6 +1534,9 @@ class ProxyConfig:
|
|||
_license_check.license_str = os.getenv("LITELLM_LICENSE", None)
|
||||
premium_user = _license_check.is_premium()
|
||||
|
||||
## Callback settings
|
||||
callback_settings = config.get("callback_settings", None)
|
||||
|
||||
## LITELLM MODULE SETTINGS (e.g. litellm.drop_params=True,..)
|
||||
litellm_settings = config.get("litellm_settings", None)
|
||||
if litellm_settings is None:
|
||||
|
|
|
@ -12,6 +12,45 @@ from litellm.integrations.opentelemetry import OpenTelemetry, OpenTelemetryConfi
|
|||
verbose_logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class TestOpenTelemetry(OpenTelemetry):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.kwargs = None
|
||||
|
||||
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||
print("in async_log_success_event for TestOpenTelemetry kwargs=", kwargs)
|
||||
self.kwargs = kwargs
|
||||
await super().async_log_success_event(
|
||||
kwargs, response_obj, start_time, end_time
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_awesome_otel_with_message_logging_off():
|
||||
litellm.set_verbose = True
|
||||
|
||||
otel_logger = TestOpenTelemetry(
|
||||
message_logging=False, config=OpenTelemetryConfig(exporter="console")
|
||||
)
|
||||
|
||||
litellm.callbacks = [otel_logger]
|
||||
litellm.success_callback = []
|
||||
litellm.failure_callback = []
|
||||
|
||||
response = await litellm.acompletion(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
mock_response="hi",
|
||||
)
|
||||
print("response", response)
|
||||
|
||||
await asyncio.sleep(5)
|
||||
|
||||
assert otel_logger.kwargs["messages"] == [
|
||||
{"role": "user", "content": "redacted-by-litellm"}
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip(reason="Local only test. WIP.")
|
||||
async def test_async_otel_callback():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue