From aea68cbeb652e52b09e3da9f195728144d6db664 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Tue, 26 Nov 2024 19:27:06 -0800 Subject: [PATCH] (feat) DataDog Logger - Add Failure logging + use Standard Logging payload (#6929) * add async_log_failure_event for dd * use standard logging payload for DD logging * use standard logging payload for DD * fix use SLP status * allow opting into _create_v0_logging_payload * add unit tests for DD logging payload * fix dd logging tests --- litellm/__init__.py | 1 + litellm/integrations/datadog/datadog.py | 210 ++++++++++++------ .../test_datadog.py | 136 ++++++++++-- 3 files changed, 257 insertions(+), 90 deletions(-) rename tests/{local_testing => logging_callback_tests}/test_datadog.py (65%) diff --git a/litellm/__init__.py b/litellm/__init__.py index 65b1b3465..43f91fe58 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -68,6 +68,7 @@ callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] = langfuse_default_tags: Optional[List[str]] = None langsmith_batch_size: Optional[int] = None argilla_batch_size: Optional[int] = None +datadog_use_v1: Optional[bool] = False # if you want to use v1 datadog logged payload argilla_transformation_object: Optional[Dict[str, Any]] = None _async_input_callback: List[Callable] = ( [] diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index 40044ce9f..527b6f87d 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -33,6 +33,7 @@ from litellm.llms.custom_httpx.http_handler import ( httpxSpecialProvider, ) from litellm.types.services import ServiceLoggerPayload +from litellm.types.utils import StandardLoggingPayload from .types import DD_ERRORS, DatadogPayload, DataDogStatus from .utils import make_json_serializable @@ -106,20 +107,20 @@ class DataDogLogger(CustomBatchLogger): verbose_logger.debug( "Datadog: Logging - Enters logging function for model %s", kwargs ) - dd_payload = self.create_datadog_logging_payload( - kwargs=kwargs, - response_obj=response_obj, - start_time=start_time, - end_time=end_time, - ) + await self._log_async_event(kwargs, response_obj, start_time, end_time) - self.log_queue.append(dd_payload) + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + try: verbose_logger.debug( - f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + "Datadog: Logging - Enters logging function for model %s", kwargs ) - - if len(self.log_queue) >= self.batch_size: - await self.async_send_batch() + await self._log_async_event(kwargs, response_obj, start_time, end_time) except Exception as e: verbose_logger.exception( @@ -181,12 +182,20 @@ class DataDogLogger(CustomBatchLogger): verbose_logger.debug( "Datadog: Logging - Enters logging function for model %s", kwargs ) - dd_payload = self.create_datadog_logging_payload( - kwargs=kwargs, - response_obj=response_obj, - start_time=start_time, - end_time=end_time, - ) + if litellm.datadog_use_v1 is True: + dd_payload = self._create_v0_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + else: + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) response = self.sync_client.post( url=self.intake_url, @@ -215,6 +224,22 @@ class DataDogLogger(CustomBatchLogger): pass pass + async def _log_async_event(self, kwargs, response_obj, start_time, end_time): + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + self.log_queue.append(dd_payload) + verbose_logger.debug( + f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + def create_datadog_logging_payload( self, kwargs: Union[dict, Any], @@ -236,63 +261,19 @@ class DataDogLogger(CustomBatchLogger): """ import json - litellm_params = kwargs.get("litellm_params", {}) - metadata = ( - litellm_params.get("metadata", {}) or {} - ) # if litellm_params['metadata'] == None - messages = kwargs.get("messages") - optional_params = kwargs.get("optional_params", {}) - call_type = kwargs.get("call_type", "litellm.completion") - cache_hit = kwargs.get("cache_hit", False) - usage = response_obj["usage"] - id = response_obj.get("id", str(uuid.uuid4())) - usage = dict(usage) - try: - response_time = (end_time - start_time).total_seconds() * 1000 - except Exception: - response_time = None + standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + if standard_logging_object is None: + raise ValueError("standard_logging_object not found in kwargs") - try: - response_obj = dict(response_obj) - except Exception: - response_obj = response_obj - - # Clean Metadata before logging - never log raw metadata - # the raw metadata can contain circular references which leads to infinite recursion - # we clean out all extra litellm metadata params before logging - clean_metadata = {} - if isinstance(metadata, dict): - for key, value in metadata.items(): - # clean litellm metadata before logging - if key in [ - "endpoint", - "caching_groups", - "previous_models", - ]: - continue - else: - clean_metadata[key] = value + status = DataDogStatus.INFO + if standard_logging_object.get("status") == "failure": + status = DataDogStatus.ERROR # Build the initial payload - payload = { - "id": id, - "call_type": call_type, - "cache_hit": cache_hit, - "start_time": start_time, - "end_time": end_time, - "response_time": response_time, - "model": kwargs.get("model", ""), - "user": kwargs.get("user", ""), - "model_parameters": optional_params, - "spend": kwargs.get("response_cost", 0), - "messages": messages, - "response": response_obj, - "usage": usage, - "metadata": clean_metadata, - } - - make_json_serializable(payload) - json_payload = json.dumps(payload) + make_json_serializable(standard_logging_object) + json_payload = json.dumps(standard_logging_object) verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) @@ -302,7 +283,7 @@ class DataDogLogger(CustomBatchLogger): hostname="", message=json_payload, service="litellm-server", - status=DataDogStatus.INFO, + status=status, ) return dd_payload @@ -382,3 +363,88 @@ class DataDogLogger(CustomBatchLogger): No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this """ return + + def _create_v0_logging_payload( + self, + kwargs: Union[dict, Any], + response_obj: Any, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> DatadogPayload: + """ + Note: This is our V1 Version of DataDog Logging Payload + + + (Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True` + """ + import json + + litellm_params = kwargs.get("litellm_params", {}) + metadata = ( + litellm_params.get("metadata", {}) or {} + ) # if litellm_params['metadata'] == None + messages = kwargs.get("messages") + optional_params = kwargs.get("optional_params", {}) + call_type = kwargs.get("call_type", "litellm.completion") + cache_hit = kwargs.get("cache_hit", False) + usage = response_obj["usage"] + id = response_obj.get("id", str(uuid.uuid4())) + usage = dict(usage) + try: + response_time = (end_time - start_time).total_seconds() * 1000 + except Exception: + response_time = None + + try: + response_obj = dict(response_obj) + except Exception: + response_obj = response_obj + + # Clean Metadata before logging - never log raw metadata + # the raw metadata can contain circular references which leads to infinite recursion + # we clean out all extra litellm metadata params before logging + clean_metadata = {} + if isinstance(metadata, dict): + for key, value in metadata.items(): + # clean litellm metadata before logging + if key in [ + "endpoint", + "caching_groups", + "previous_models", + ]: + continue + else: + clean_metadata[key] = value + + # Build the initial payload + payload = { + "id": id, + "call_type": call_type, + "cache_hit": cache_hit, + "start_time": start_time, + "end_time": end_time, + "response_time": response_time, + "model": kwargs.get("model", ""), + "user": kwargs.get("user", ""), + "model_parameters": optional_params, + "spend": kwargs.get("response_cost", 0), + "messages": messages, + "response": response_obj, + "usage": usage, + "metadata": clean_metadata, + } + + make_json_serializable(payload) + json_payload = json.dumps(payload) + + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + + dd_payload = DatadogPayload( + ddsource=os.getenv("DD_SOURCE", "litellm"), + ddtags="", + hostname="", + message=json_payload, + service="litellm-server", + status=DataDogStatus.INFO, + ) + return dd_payload diff --git a/tests/local_testing/test_datadog.py b/tests/logging_callback_tests/test_datadog.py similarity index 65% rename from tests/local_testing/test_datadog.py rename to tests/logging_callback_tests/test_datadog.py index 990a5b76c..a93156226 100644 --- a/tests/local_testing/test_datadog.py +++ b/tests/logging_callback_tests/test_datadog.py @@ -2,6 +2,7 @@ import io import os import sys + sys.path.insert(0, os.path.abspath("../..")) import asyncio @@ -16,11 +17,126 @@ import pytest import litellm from litellm import completion from litellm._logging import verbose_logger -from litellm.integrations.datadog.types import DatadogPayload +from litellm.integrations.datadog.datadog import * +from datetime import datetime, timedelta +from litellm.types.utils import ( + StandardLoggingPayload, + StandardLoggingModelInformation, + StandardLoggingMetadata, + StandardLoggingHiddenParams, +) verbose_logger.setLevel(logging.DEBUG) +def create_standard_logging_payload() -> StandardLoggingPayload: + return StandardLoggingPayload( + id="test_id", + call_type="completion", + response_cost=0.1, + response_cost_failure_debug_info=None, + status="success", + total_tokens=30, + prompt_tokens=20, + completion_tokens=10, + startTime=1234567890.0, + endTime=1234567891.0, + completionStartTime=1234567890.5, + model_map_information=StandardLoggingModelInformation( + model_map_key="gpt-3.5-turbo", model_map_value=None + ), + model="gpt-3.5-turbo", + model_id="model-123", + model_group="openai-gpt", + api_base="https://api.openai.com", + metadata=StandardLoggingMetadata( + user_api_key_hash="test_hash", + user_api_key_org_id=None, + user_api_key_alias="test_alias", + user_api_key_team_id="test_team", + user_api_key_user_id="test_user", + user_api_key_team_alias="test_team_alias", + spend_logs_metadata=None, + requester_ip_address="127.0.0.1", + requester_metadata=None, + ), + cache_hit=False, + cache_key=None, + saved_cache_cost=0.0, + request_tags=[], + end_user=None, + requester_ip_address="127.0.0.1", + messages=[{"role": "user", "content": "Hello, world!"}], + response={"choices": [{"message": {"content": "Hi there!"}}]}, + error_str=None, + model_parameters={"stream": True}, + hidden_params=StandardLoggingHiddenParams( + model_id="model-123", + cache_key=None, + api_base="https://api.openai.com", + response_cost="0.1", + additional_headers=None, + ), + ) + + +@pytest.mark.asyncio +async def test_create_datadog_logging_payload(): + """Test creating a DataDog logging payload from a standard logging object""" + dd_logger = DataDogLogger() + standard_payload = create_standard_logging_payload() + + # Create mock kwargs with the standard logging object + kwargs = {"standard_logging_object": standard_payload} + + # Test payload creation + dd_payload = dd_logger.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=None, + start_time=datetime.now(), + end_time=datetime.now(), + ) + + # Verify payload structure + assert dd_payload["ddsource"] == os.getenv("DD_SOURCE", "litellm") + assert dd_payload["service"] == "litellm-server" + assert dd_payload["status"] == DataDogStatus.INFO + + # verify the message field == standard_payload + dict_payload = json.loads(dd_payload["message"]) + assert dict_payload == standard_payload + + +@pytest.mark.asyncio +async def test_datadog_failure_logging(): + """Test logging a failure event to DataDog""" + dd_logger = DataDogLogger() + standard_payload = create_standard_logging_payload() + standard_payload["status"] = "failure" # Set status to failure + standard_payload["error_str"] = "Test error" + + kwargs = {"standard_logging_object": standard_payload} + + dd_payload = dd_logger.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=None, + start_time=datetime.now(), + end_time=datetime.now(), + ) + + assert ( + dd_payload["status"] == DataDogStatus.ERROR + ) # Verify failure maps to warning status + + # verify the message field == standard_payload + dict_payload = json.loads(dd_payload["message"]) + assert dict_payload == standard_payload + + # verify error_str is in the message field + assert "error_str" in dict_payload + assert dict_payload["error_str"] == "Test error" + + @pytest.mark.asyncio async def test_datadog_logging_http_request(): """ @@ -111,22 +227,7 @@ async def test_datadog_logging_http_request(): # Parse the 'message' field as JSON and check its structure message = json.loads(body[0]["message"]) - expected_message_fields = [ - "id", - "call_type", - "cache_hit", - "start_time", - "end_time", - "response_time", - "model", - "user", - "model_parameters", - "spend", - "messages", - "response", - "usage", - "metadata", - ] + expected_message_fields = StandardLoggingPayload.__annotations__.keys() for field in expected_message_fields: assert field in message, f"Field '{field}' is missing from the message" @@ -138,7 +239,6 @@ async def test_datadog_logging_http_request(): assert "temperature" in message["model_parameters"] assert "max_tokens" in message["model_parameters"] assert isinstance(message["response"], dict) - assert isinstance(message["usage"], dict) assert isinstance(message["metadata"], dict) except Exception as e: