(feat) DataDog Logger - Add Failure logging + use Standard Logging payload (#6929)

* add async_log_failure_event for dd

* use standard logging payload for DD logging

* use standard logging payload for DD

* fix use SLP status

* allow opting into _create_v0_logging_payload

* add unit tests for DD logging payload

* fix dd logging tests
This commit is contained in:
Ishaan Jaff 2024-11-26 19:27:06 -08:00 committed by GitHub
parent ece149c30b
commit b08b37b11d
3 changed files with 257 additions and 90 deletions

View file

@ -33,6 +33,7 @@ from litellm.llms.custom_httpx.http_handler import (
httpxSpecialProvider,
)
from litellm.types.services import ServiceLoggerPayload
from litellm.types.utils import StandardLoggingPayload
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
from .utils import make_json_serializable
@ -106,20 +107,20 @@ class DataDogLogger(CustomBatchLogger):
verbose_logger.debug(
"Datadog: Logging - Enters logging function for model %s", kwargs
)
dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
await self._log_async_event(kwargs, response_obj, start_time, end_time)
self.log_queue.append(dd_payload)
except Exception as e:
verbose_logger.exception(
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}"
)
pass
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
"Datadog: Logging - Enters logging function for model %s", kwargs
)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
await self._log_async_event(kwargs, response_obj, start_time, end_time)
except Exception as e:
verbose_logger.exception(
@ -181,12 +182,20 @@ class DataDogLogger(CustomBatchLogger):
verbose_logger.debug(
"Datadog: Logging - Enters logging function for model %s", kwargs
)
dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
if litellm.datadog_use_v1 is True:
dd_payload = self._create_v0_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
else:
dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
response = self.sync_client.post(
url=self.intake_url,
@ -215,6 +224,22 @@ class DataDogLogger(CustomBatchLogger):
pass
pass
async def _log_async_event(self, kwargs, response_obj, start_time, end_time):
dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
self.log_queue.append(dd_payload)
verbose_logger.debug(
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
def create_datadog_logging_payload(
self,
kwargs: Union[dict, Any],
@ -236,63 +261,19 @@ class DataDogLogger(CustomBatchLogger):
"""
import json
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
messages = kwargs.get("messages")
optional_params = kwargs.get("optional_params", {})
call_type = kwargs.get("call_type", "litellm.completion")
cache_hit = kwargs.get("cache_hit", False)
usage = response_obj["usage"]
id = response_obj.get("id", str(uuid.uuid4()))
usage = dict(usage)
try:
response_time = (end_time - start_time).total_seconds() * 1000
except Exception:
response_time = None
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if standard_logging_object is None:
raise ValueError("standard_logging_object not found in kwargs")
try:
response_obj = dict(response_obj)
except Exception:
response_obj = response_obj
# Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging
clean_metadata = {}
if isinstance(metadata, dict):
for key, value in metadata.items():
# clean litellm metadata before logging
if key in [
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
status = DataDogStatus.INFO
if standard_logging_object.get("status") == "failure":
status = DataDogStatus.ERROR
# Build the initial payload
payload = {
"id": id,
"call_type": call_type,
"cache_hit": cache_hit,
"start_time": start_time,
"end_time": end_time,
"response_time": response_time,
"model": kwargs.get("model", ""),
"user": kwargs.get("user", ""),
"model_parameters": optional_params,
"spend": kwargs.get("response_cost", 0),
"messages": messages,
"response": response_obj,
"usage": usage,
"metadata": clean_metadata,
}
make_json_serializable(payload)
json_payload = json.dumps(payload)
make_json_serializable(standard_logging_object)
json_payload = json.dumps(standard_logging_object)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
@ -302,7 +283,7 @@ class DataDogLogger(CustomBatchLogger):
hostname="",
message=json_payload,
service="litellm-server",
status=DataDogStatus.INFO,
status=status,
)
return dd_payload
@ -382,3 +363,88 @@ class DataDogLogger(CustomBatchLogger):
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
"""
return
def _create_v0_logging_payload(
self,
kwargs: Union[dict, Any],
response_obj: Any,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> DatadogPayload:
"""
Note: This is our V1 Version of DataDog Logging Payload
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True`
"""
import json
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
messages = kwargs.get("messages")
optional_params = kwargs.get("optional_params", {})
call_type = kwargs.get("call_type", "litellm.completion")
cache_hit = kwargs.get("cache_hit", False)
usage = response_obj["usage"]
id = response_obj.get("id", str(uuid.uuid4()))
usage = dict(usage)
try:
response_time = (end_time - start_time).total_seconds() * 1000
except Exception:
response_time = None
try:
response_obj = dict(response_obj)
except Exception:
response_obj = response_obj
# Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging
clean_metadata = {}
if isinstance(metadata, dict):
for key, value in metadata.items():
# clean litellm metadata before logging
if key in [
"endpoint",
"caching_groups",
"previous_models",
]:
continue
else:
clean_metadata[key] = value
# Build the initial payload
payload = {
"id": id,
"call_type": call_type,
"cache_hit": cache_hit,
"start_time": start_time,
"end_time": end_time,
"response_time": response_time,
"model": kwargs.get("model", ""),
"user": kwargs.get("user", ""),
"model_parameters": optional_params,
"spend": kwargs.get("response_cost", 0),
"messages": messages,
"response": response_obj,
"usage": usage,
"metadata": clean_metadata,
}
make_json_serializable(payload)
json_payload = json.dumps(payload)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
dd_payload = DatadogPayload(
ddsource=os.getenv("DD_SOURCE", "litellm"),
ddtags="",
hostname="",
message=json_payload,
service="litellm-server",
status=DataDogStatus.INFO,
)
return dd_payload