forked from phoenix/litellm-mirror
(feat) DataDog Logger - Add Failure logging + use Standard Logging payload (#6929)
* add async_log_failure_event for dd * use standard logging payload for DD logging * use standard logging payload for DD * fix use SLP status * allow opting into _create_v0_logging_payload * add unit tests for DD logging payload * fix dd logging tests
This commit is contained in:
parent
d84e355eab
commit
aea68cbeb6
3 changed files with 257 additions and 90 deletions
|
@ -68,6 +68,7 @@ callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] =
|
||||||
langfuse_default_tags: Optional[List[str]] = None
|
langfuse_default_tags: Optional[List[str]] = None
|
||||||
langsmith_batch_size: Optional[int] = None
|
langsmith_batch_size: Optional[int] = None
|
||||||
argilla_batch_size: Optional[int] = None
|
argilla_batch_size: Optional[int] = None
|
||||||
|
datadog_use_v1: Optional[bool] = False # if you want to use v1 datadog logged payload
|
||||||
argilla_transformation_object: Optional[Dict[str, Any]] = None
|
argilla_transformation_object: Optional[Dict[str, Any]] = None
|
||||||
_async_input_callback: List[Callable] = (
|
_async_input_callback: List[Callable] = (
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -33,6 +33,7 @@ from litellm.llms.custom_httpx.http_handler import (
|
||||||
httpxSpecialProvider,
|
httpxSpecialProvider,
|
||||||
)
|
)
|
||||||
from litellm.types.services import ServiceLoggerPayload
|
from litellm.types.services import ServiceLoggerPayload
|
||||||
|
from litellm.types.utils import StandardLoggingPayload
|
||||||
|
|
||||||
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
|
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
|
||||||
from .utils import make_json_serializable
|
from .utils import make_json_serializable
|
||||||
|
@ -106,20 +107,20 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Datadog: Logging - Enters logging function for model %s", kwargs
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
dd_payload = self.create_datadog_logging_payload(
|
await self._log_async_event(kwargs, response_obj, start_time, end_time)
|
||||||
kwargs=kwargs,
|
|
||||||
response_obj=response_obj,
|
|
||||||
start_time=start_time,
|
|
||||||
end_time=end_time,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log_queue.append(dd_payload)
|
except Exception as e:
|
||||||
|
verbose_logger.exception(
|
||||||
|
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}"
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
try:
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
|
await self._log_async_event(kwargs, response_obj, start_time, end_time)
|
||||||
if len(self.log_queue) >= self.batch_size:
|
|
||||||
await self.async_send_batch()
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_logger.exception(
|
verbose_logger.exception(
|
||||||
|
@ -181,6 +182,14 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Datadog: Logging - Enters logging function for model %s", kwargs
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
|
if litellm.datadog_use_v1 is True:
|
||||||
|
dd_payload = self._create_v0_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=response_obj,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
)
|
||||||
|
else:
|
||||||
dd_payload = self.create_datadog_logging_payload(
|
dd_payload = self.create_datadog_logging_payload(
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
response_obj=response_obj,
|
response_obj=response_obj,
|
||||||
|
@ -215,6 +224,22 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
pass
|
pass
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def _log_async_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
dd_payload = self.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=response_obj,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.log_queue.append(dd_payload)
|
||||||
|
verbose_logger.debug(
|
||||||
|
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(self.log_queue) >= self.batch_size:
|
||||||
|
await self.async_send_batch()
|
||||||
|
|
||||||
def create_datadog_logging_payload(
|
def create_datadog_logging_payload(
|
||||||
self,
|
self,
|
||||||
kwargs: Union[dict, Any],
|
kwargs: Union[dict, Any],
|
||||||
|
@ -236,63 +261,19 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
|
||||||
litellm_params = kwargs.get("litellm_params", {})
|
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
|
||||||
metadata = (
|
"standard_logging_object", None
|
||||||
litellm_params.get("metadata", {}) or {}
|
)
|
||||||
) # if litellm_params['metadata'] == None
|
if standard_logging_object is None:
|
||||||
messages = kwargs.get("messages")
|
raise ValueError("standard_logging_object not found in kwargs")
|
||||||
optional_params = kwargs.get("optional_params", {})
|
|
||||||
call_type = kwargs.get("call_type", "litellm.completion")
|
|
||||||
cache_hit = kwargs.get("cache_hit", False)
|
|
||||||
usage = response_obj["usage"]
|
|
||||||
id = response_obj.get("id", str(uuid.uuid4()))
|
|
||||||
usage = dict(usage)
|
|
||||||
try:
|
|
||||||
response_time = (end_time - start_time).total_seconds() * 1000
|
|
||||||
except Exception:
|
|
||||||
response_time = None
|
|
||||||
|
|
||||||
try:
|
status = DataDogStatus.INFO
|
||||||
response_obj = dict(response_obj)
|
if standard_logging_object.get("status") == "failure":
|
||||||
except Exception:
|
status = DataDogStatus.ERROR
|
||||||
response_obj = response_obj
|
|
||||||
|
|
||||||
# Clean Metadata before logging - never log raw metadata
|
|
||||||
# the raw metadata can contain circular references which leads to infinite recursion
|
|
||||||
# we clean out all extra litellm metadata params before logging
|
|
||||||
clean_metadata = {}
|
|
||||||
if isinstance(metadata, dict):
|
|
||||||
for key, value in metadata.items():
|
|
||||||
# clean litellm metadata before logging
|
|
||||||
if key in [
|
|
||||||
"endpoint",
|
|
||||||
"caching_groups",
|
|
||||||
"previous_models",
|
|
||||||
]:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
clean_metadata[key] = value
|
|
||||||
|
|
||||||
# Build the initial payload
|
# Build the initial payload
|
||||||
payload = {
|
make_json_serializable(standard_logging_object)
|
||||||
"id": id,
|
json_payload = json.dumps(standard_logging_object)
|
||||||
"call_type": call_type,
|
|
||||||
"cache_hit": cache_hit,
|
|
||||||
"start_time": start_time,
|
|
||||||
"end_time": end_time,
|
|
||||||
"response_time": response_time,
|
|
||||||
"model": kwargs.get("model", ""),
|
|
||||||
"user": kwargs.get("user", ""),
|
|
||||||
"model_parameters": optional_params,
|
|
||||||
"spend": kwargs.get("response_cost", 0),
|
|
||||||
"messages": messages,
|
|
||||||
"response": response_obj,
|
|
||||||
"usage": usage,
|
|
||||||
"metadata": clean_metadata,
|
|
||||||
}
|
|
||||||
|
|
||||||
make_json_serializable(payload)
|
|
||||||
json_payload = json.dumps(payload)
|
|
||||||
|
|
||||||
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
||||||
|
|
||||||
|
@ -302,7 +283,7 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
hostname="",
|
hostname="",
|
||||||
message=json_payload,
|
message=json_payload,
|
||||||
service="litellm-server",
|
service="litellm-server",
|
||||||
status=DataDogStatus.INFO,
|
status=status,
|
||||||
)
|
)
|
||||||
return dd_payload
|
return dd_payload
|
||||||
|
|
||||||
|
@ -382,3 +363,88 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
|
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
|
||||||
"""
|
"""
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def _create_v0_logging_payload(
|
||||||
|
self,
|
||||||
|
kwargs: Union[dict, Any],
|
||||||
|
response_obj: Any,
|
||||||
|
start_time: datetime.datetime,
|
||||||
|
end_time: datetime.datetime,
|
||||||
|
) -> DatadogPayload:
|
||||||
|
"""
|
||||||
|
Note: This is our V1 Version of DataDog Logging Payload
|
||||||
|
|
||||||
|
|
||||||
|
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True`
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
litellm_params = kwargs.get("litellm_params", {})
|
||||||
|
metadata = (
|
||||||
|
litellm_params.get("metadata", {}) or {}
|
||||||
|
) # if litellm_params['metadata'] == None
|
||||||
|
messages = kwargs.get("messages")
|
||||||
|
optional_params = kwargs.get("optional_params", {})
|
||||||
|
call_type = kwargs.get("call_type", "litellm.completion")
|
||||||
|
cache_hit = kwargs.get("cache_hit", False)
|
||||||
|
usage = response_obj["usage"]
|
||||||
|
id = response_obj.get("id", str(uuid.uuid4()))
|
||||||
|
usage = dict(usage)
|
||||||
|
try:
|
||||||
|
response_time = (end_time - start_time).total_seconds() * 1000
|
||||||
|
except Exception:
|
||||||
|
response_time = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_obj = dict(response_obj)
|
||||||
|
except Exception:
|
||||||
|
response_obj = response_obj
|
||||||
|
|
||||||
|
# Clean Metadata before logging - never log raw metadata
|
||||||
|
# the raw metadata can contain circular references which leads to infinite recursion
|
||||||
|
# we clean out all extra litellm metadata params before logging
|
||||||
|
clean_metadata = {}
|
||||||
|
if isinstance(metadata, dict):
|
||||||
|
for key, value in metadata.items():
|
||||||
|
# clean litellm metadata before logging
|
||||||
|
if key in [
|
||||||
|
"endpoint",
|
||||||
|
"caching_groups",
|
||||||
|
"previous_models",
|
||||||
|
]:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
clean_metadata[key] = value
|
||||||
|
|
||||||
|
# Build the initial payload
|
||||||
|
payload = {
|
||||||
|
"id": id,
|
||||||
|
"call_type": call_type,
|
||||||
|
"cache_hit": cache_hit,
|
||||||
|
"start_time": start_time,
|
||||||
|
"end_time": end_time,
|
||||||
|
"response_time": response_time,
|
||||||
|
"model": kwargs.get("model", ""),
|
||||||
|
"user": kwargs.get("user", ""),
|
||||||
|
"model_parameters": optional_params,
|
||||||
|
"spend": kwargs.get("response_cost", 0),
|
||||||
|
"messages": messages,
|
||||||
|
"response": response_obj,
|
||||||
|
"usage": usage,
|
||||||
|
"metadata": clean_metadata,
|
||||||
|
}
|
||||||
|
|
||||||
|
make_json_serializable(payload)
|
||||||
|
json_payload = json.dumps(payload)
|
||||||
|
|
||||||
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
||||||
|
|
||||||
|
dd_payload = DatadogPayload(
|
||||||
|
ddsource=os.getenv("DD_SOURCE", "litellm"),
|
||||||
|
ddtags="",
|
||||||
|
hostname="",
|
||||||
|
message=json_payload,
|
||||||
|
service="litellm-server",
|
||||||
|
status=DataDogStatus.INFO,
|
||||||
|
)
|
||||||
|
return dd_payload
|
||||||
|
|
|
@ -2,6 +2,7 @@ import io
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
|
||||||
sys.path.insert(0, os.path.abspath("../.."))
|
sys.path.insert(0, os.path.abspath("../.."))
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -16,11 +17,126 @@ import pytest
|
||||||
import litellm
|
import litellm
|
||||||
from litellm import completion
|
from litellm import completion
|
||||||
from litellm._logging import verbose_logger
|
from litellm._logging import verbose_logger
|
||||||
from litellm.integrations.datadog.types import DatadogPayload
|
from litellm.integrations.datadog.datadog import *
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from litellm.types.utils import (
|
||||||
|
StandardLoggingPayload,
|
||||||
|
StandardLoggingModelInformation,
|
||||||
|
StandardLoggingMetadata,
|
||||||
|
StandardLoggingHiddenParams,
|
||||||
|
)
|
||||||
|
|
||||||
verbose_logger.setLevel(logging.DEBUG)
|
verbose_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def create_standard_logging_payload() -> StandardLoggingPayload:
|
||||||
|
return StandardLoggingPayload(
|
||||||
|
id="test_id",
|
||||||
|
call_type="completion",
|
||||||
|
response_cost=0.1,
|
||||||
|
response_cost_failure_debug_info=None,
|
||||||
|
status="success",
|
||||||
|
total_tokens=30,
|
||||||
|
prompt_tokens=20,
|
||||||
|
completion_tokens=10,
|
||||||
|
startTime=1234567890.0,
|
||||||
|
endTime=1234567891.0,
|
||||||
|
completionStartTime=1234567890.5,
|
||||||
|
model_map_information=StandardLoggingModelInformation(
|
||||||
|
model_map_key="gpt-3.5-turbo", model_map_value=None
|
||||||
|
),
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
|
model_id="model-123",
|
||||||
|
model_group="openai-gpt",
|
||||||
|
api_base="https://api.openai.com",
|
||||||
|
metadata=StandardLoggingMetadata(
|
||||||
|
user_api_key_hash="test_hash",
|
||||||
|
user_api_key_org_id=None,
|
||||||
|
user_api_key_alias="test_alias",
|
||||||
|
user_api_key_team_id="test_team",
|
||||||
|
user_api_key_user_id="test_user",
|
||||||
|
user_api_key_team_alias="test_team_alias",
|
||||||
|
spend_logs_metadata=None,
|
||||||
|
requester_ip_address="127.0.0.1",
|
||||||
|
requester_metadata=None,
|
||||||
|
),
|
||||||
|
cache_hit=False,
|
||||||
|
cache_key=None,
|
||||||
|
saved_cache_cost=0.0,
|
||||||
|
request_tags=[],
|
||||||
|
end_user=None,
|
||||||
|
requester_ip_address="127.0.0.1",
|
||||||
|
messages=[{"role": "user", "content": "Hello, world!"}],
|
||||||
|
response={"choices": [{"message": {"content": "Hi there!"}}]},
|
||||||
|
error_str=None,
|
||||||
|
model_parameters={"stream": True},
|
||||||
|
hidden_params=StandardLoggingHiddenParams(
|
||||||
|
model_id="model-123",
|
||||||
|
cache_key=None,
|
||||||
|
api_base="https://api.openai.com",
|
||||||
|
response_cost="0.1",
|
||||||
|
additional_headers=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_create_datadog_logging_payload():
|
||||||
|
"""Test creating a DataDog logging payload from a standard logging object"""
|
||||||
|
dd_logger = DataDogLogger()
|
||||||
|
standard_payload = create_standard_logging_payload()
|
||||||
|
|
||||||
|
# Create mock kwargs with the standard logging object
|
||||||
|
kwargs = {"standard_logging_object": standard_payload}
|
||||||
|
|
||||||
|
# Test payload creation
|
||||||
|
dd_payload = dd_logger.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=None,
|
||||||
|
start_time=datetime.now(),
|
||||||
|
end_time=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify payload structure
|
||||||
|
assert dd_payload["ddsource"] == os.getenv("DD_SOURCE", "litellm")
|
||||||
|
assert dd_payload["service"] == "litellm-server"
|
||||||
|
assert dd_payload["status"] == DataDogStatus.INFO
|
||||||
|
|
||||||
|
# verify the message field == standard_payload
|
||||||
|
dict_payload = json.loads(dd_payload["message"])
|
||||||
|
assert dict_payload == standard_payload
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_datadog_failure_logging():
|
||||||
|
"""Test logging a failure event to DataDog"""
|
||||||
|
dd_logger = DataDogLogger()
|
||||||
|
standard_payload = create_standard_logging_payload()
|
||||||
|
standard_payload["status"] = "failure" # Set status to failure
|
||||||
|
standard_payload["error_str"] = "Test error"
|
||||||
|
|
||||||
|
kwargs = {"standard_logging_object": standard_payload}
|
||||||
|
|
||||||
|
dd_payload = dd_logger.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=None,
|
||||||
|
start_time=datetime.now(),
|
||||||
|
end_time=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
dd_payload["status"] == DataDogStatus.ERROR
|
||||||
|
) # Verify failure maps to warning status
|
||||||
|
|
||||||
|
# verify the message field == standard_payload
|
||||||
|
dict_payload = json.loads(dd_payload["message"])
|
||||||
|
assert dict_payload == standard_payload
|
||||||
|
|
||||||
|
# verify error_str is in the message field
|
||||||
|
assert "error_str" in dict_payload
|
||||||
|
assert dict_payload["error_str"] == "Test error"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_datadog_logging_http_request():
|
async def test_datadog_logging_http_request():
|
||||||
"""
|
"""
|
||||||
|
@ -111,22 +227,7 @@ async def test_datadog_logging_http_request():
|
||||||
# Parse the 'message' field as JSON and check its structure
|
# Parse the 'message' field as JSON and check its structure
|
||||||
message = json.loads(body[0]["message"])
|
message = json.loads(body[0]["message"])
|
||||||
|
|
||||||
expected_message_fields = [
|
expected_message_fields = StandardLoggingPayload.__annotations__.keys()
|
||||||
"id",
|
|
||||||
"call_type",
|
|
||||||
"cache_hit",
|
|
||||||
"start_time",
|
|
||||||
"end_time",
|
|
||||||
"response_time",
|
|
||||||
"model",
|
|
||||||
"user",
|
|
||||||
"model_parameters",
|
|
||||||
"spend",
|
|
||||||
"messages",
|
|
||||||
"response",
|
|
||||||
"usage",
|
|
||||||
"metadata",
|
|
||||||
]
|
|
||||||
|
|
||||||
for field in expected_message_fields:
|
for field in expected_message_fields:
|
||||||
assert field in message, f"Field '{field}' is missing from the message"
|
assert field in message, f"Field '{field}' is missing from the message"
|
||||||
|
@ -138,7 +239,6 @@ async def test_datadog_logging_http_request():
|
||||||
assert "temperature" in message["model_parameters"]
|
assert "temperature" in message["model_parameters"]
|
||||||
assert "max_tokens" in message["model_parameters"]
|
assert "max_tokens" in message["model_parameters"]
|
||||||
assert isinstance(message["response"], dict)
|
assert isinstance(message["response"], dict)
|
||||||
assert isinstance(message["usage"], dict)
|
|
||||||
assert isinstance(message["metadata"], dict)
|
assert isinstance(message["metadata"], dict)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
Loading…
Add table
Add a link
Reference in a new issue