diff --git a/litellm/__init__.py b/litellm/__init__.py index 65b1b3465..43f91fe58 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -68,6 +68,7 @@ callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] = langfuse_default_tags: Optional[List[str]] = None langsmith_batch_size: Optional[int] = None argilla_batch_size: Optional[int] = None +datadog_use_v1: Optional[bool] = False # if you want to use v1 datadog logged payload argilla_transformation_object: Optional[Dict[str, Any]] = None _async_input_callback: List[Callable] = ( [] diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index 40044ce9f..6ee1bc5e7 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -32,9 +32,11 @@ from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, ) +from litellm.proxy._types import UserAPIKeyAuth +from litellm.types.integrations.datadog import * from litellm.types.services import ServiceLoggerPayload +from litellm.types.utils import StandardLoggingPayload -from .types import DD_ERRORS, DatadogPayload, DataDogStatus from .utils import make_json_serializable DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept @@ -106,20 +108,20 @@ class DataDogLogger(CustomBatchLogger): verbose_logger.debug( "Datadog: Logging - Enters logging function for model %s", kwargs ) - dd_payload = self.create_datadog_logging_payload( - kwargs=kwargs, - response_obj=response_obj, - start_time=start_time, - end_time=end_time, - ) + await self._log_async_event(kwargs, response_obj, start_time, end_time) - self.log_queue.append(dd_payload) + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + try: verbose_logger.debug( - f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + "Datadog: Logging - Enters logging function for model %s", kwargs ) - - if len(self.log_queue) >= self.batch_size: - await self.async_send_batch() + await self._log_async_event(kwargs, response_obj, start_time, end_time) except Exception as e: verbose_logger.exception( @@ -181,12 +183,20 @@ class DataDogLogger(CustomBatchLogger): verbose_logger.debug( "Datadog: Logging - Enters logging function for model %s", kwargs ) - dd_payload = self.create_datadog_logging_payload( - kwargs=kwargs, - response_obj=response_obj, - start_time=start_time, - end_time=end_time, - ) + if litellm.datadog_use_v1 is True: + dd_payload = self._create_v0_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + else: + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) response = self.sync_client.post( url=self.intake_url, @@ -215,6 +225,22 @@ class DataDogLogger(CustomBatchLogger): pass pass + async def _log_async_event(self, kwargs, response_obj, start_time, end_time): + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + self.log_queue.append(dd_payload) + verbose_logger.debug( + f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + def create_datadog_logging_payload( self, kwargs: Union[dict, Any], @@ -236,6 +262,153 @@ class DataDogLogger(CustomBatchLogger): """ import json + standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + if standard_logging_object is None: + raise ValueError("standard_logging_object not found in kwargs") + + status = DataDogStatus.INFO + if standard_logging_object.get("status") == "failure": + status = DataDogStatus.ERROR + + # Build the initial payload + make_json_serializable(standard_logging_object) + json_payload = json.dumps(standard_logging_object) + + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + + dd_payload = DatadogPayload( + ddsource=os.getenv("DD_SOURCE", "litellm"), + ddtags="", + hostname="", + message=json_payload, + service="litellm-server", + status=status, + ) + return dd_payload + + async def async_send_compressed_data(self, data: List) -> Response: + """ + Async helper to send compressed data to datadog self.intake_url + + Datadog recommends using gzip to compress data + https://docs.datadoghq.com/api/latest/logs/ + + "Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" + """ + import gzip + import json + + compressed_data = gzip.compress(json.dumps(data).encode("utf-8")) + response = await self.async_client.post( + url=self.intake_url, + data=compressed_data, # type: ignore + headers={ + "DD-API-KEY": self.DD_API_KEY, + "Content-Encoding": "gzip", + "Content-Type": "application/json", + }, + ) + return response + + async def async_service_failure_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog + + - example - Redis is failing / erroring, will be logged on DataDog + """ + + try: + import json + + _payload_dict = payload.model_dump() + _dd_message_str = json.dumps(_payload_dict) + _dd_payload = DatadogPayload( + ddsource="litellm", + ddtags="", + hostname="", + message=_dd_message_str, + service="litellm-server", + status=DataDogStatus.WARN, + ) + + self.log_queue.append(_dd_payload) + + except Exception as e: + verbose_logger.exception( + f"Datadog: Logger - Exception in async_service_failure_hook: {e}" + ) + pass + + async def async_post_call_failure_hook( + self, + request_data: dict, + original_exception: Exception, + user_api_key_dict: UserAPIKeyAuth, + ): + import json + + _exception_payload = DatadogProxyFailureHookJsonMessage( + exception=str(original_exception), + error_class=str(original_exception.__class__.__name__), + status_code=getattr(original_exception, "status_code", None), + traceback=traceback.format_exc(), + user_api_key_dict=user_api_key_dict.model_dump(), + ) + + json_payload = json.dumps(_exception_payload) + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + dd_payload = DatadogPayload( + ddsource=os.getenv("DD_SOURCE", "litellm"), + ddtags="", + hostname="", + message=json_payload, + service="litellm-server", + status=DataDogStatus.ERROR, + ) + + self.log_queue.append(dd_payload) + + async def async_service_success_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog + + No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this + """ + return + + def _create_v0_logging_payload( + self, + kwargs: Union[dict, Any], + response_obj: Any, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> DatadogPayload: + """ + Note: This is our V1 Version of DataDog Logging Payload + + + (Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True` + """ + import json + litellm_params = kwargs.get("litellm_params", {}) metadata = ( litellm_params.get("metadata", {}) or {} @@ -305,80 +478,3 @@ class DataDogLogger(CustomBatchLogger): status=DataDogStatus.INFO, ) return dd_payload - - async def async_send_compressed_data(self, data: List) -> Response: - """ - Async helper to send compressed data to datadog self.intake_url - - Datadog recommends using gzip to compress data - https://docs.datadoghq.com/api/latest/logs/ - - "Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" - """ - import gzip - import json - - compressed_data = gzip.compress(json.dumps(data).encode("utf-8")) - response = await self.async_client.post( - url=self.intake_url, - data=compressed_data, # type: ignore - headers={ - "DD-API-KEY": self.DD_API_KEY, - "Content-Encoding": "gzip", - "Content-Type": "application/json", - }, - ) - return response - - async def async_service_failure_hook( - self, - payload: ServiceLoggerPayload, - error: Optional[str] = "", - parent_otel_span: Optional[Any] = None, - start_time: Optional[Union[datetimeObj, float]] = None, - end_time: Optional[Union[float, datetimeObj]] = None, - event_metadata: Optional[dict] = None, - ): - """ - Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog - - - example - Redis is failing / erroring, will be logged on DataDog - """ - - try: - import json - - _payload_dict = payload.model_dump() - _dd_message_str = json.dumps(_payload_dict) - _dd_payload = DatadogPayload( - ddsource="litellm", - ddtags="", - hostname="", - message=_dd_message_str, - service="litellm-server", - status=DataDogStatus.WARN, - ) - - self.log_queue.append(_dd_payload) - - except Exception as e: - verbose_logger.exception( - f"Datadog: Logger - Exception in async_service_failure_hook: {e}" - ) - pass - - async def async_service_success_hook( - self, - payload: ServiceLoggerPayload, - error: Optional[str] = "", - parent_otel_span: Optional[Any] = None, - start_time: Optional[Union[datetimeObj, float]] = None, - end_time: Optional[Union[float, datetimeObj]] = None, - event_metadata: Optional[dict] = None, - ): - """ - Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog - - No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this - """ - return diff --git a/litellm/proxy/auth/user_api_key_auth.py b/litellm/proxy/auth/user_api_key_auth.py index d19215245..d95016cd9 100644 --- a/litellm/proxy/auth/user_api_key_auth.py +++ b/litellm/proxy/auth/user_api_key_auth.py @@ -1197,13 +1197,15 @@ async def user_api_key_auth( # noqa: PLR0915 extra={"requester_ip": requester_ip}, ) - # Log this exception to OTEL - if open_telemetry_logger is not None: - await open_telemetry_logger.async_post_call_failure_hook( # type: ignore + # Log this exception to OTEL, Datadog, All Custom Loggers + asyncio.create_task( + proxy_logging_obj.async_log_proxy_authentication_errors( original_exception=e, - request_data={}, - user_api_key_dict=UserAPIKeyAuth(parent_otel_span=parent_otel_span), + request=request, + parent_otel_span=parent_otel_span, + api_key=api_key, ) + ) if isinstance(e, litellm.BudgetExceededError): raise ProxyException( diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 40cd86c5c..86697c186 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -8,18 +8,7 @@ model_list: model: anthropic/fake api_base: https://exampleanthropicendpoint-production.up.railway.app/ -router_settings: - provider_budget_config: - openai: - budget_limit: 0.3 # float of $ value budget for time period - time_period: 1d # can be 1d, 2d, 30d - anthropic: - budget_limit: 5 - time_period: 1d - redis_host: os.environ/REDIS_HOST - redis_port: os.environ/REDIS_PORT - redis_password: os.environ/REDIS_PASSWORD - litellm_settings: - callbacks: ["prometheus"] - success_callback: ["langfuse"] \ No newline at end of file + callbacks: ["datadog"] # will log success & failures + service_callbacks: ["datadog"] # will log DB fails / exceptions + turn_off_message_logging: True # will redact message / response content \ No newline at end of file diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 2a298af21..8e8cb5446 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -854,6 +854,20 @@ class ProxyLogging: ), ).start() + await self._run_post_call_failure_hook_custom_loggers( + original_exception=original_exception, + request_data=request_data, + user_api_key_dict=user_api_key_dict, + ) + + return + + async def _run_post_call_failure_hook_custom_loggers( + self, + original_exception: Exception, + request_data: dict, + user_api_key_dict: UserAPIKeyAuth, + ): for callback in litellm.callbacks: try: _callback: Optional[CustomLogger] = None @@ -872,7 +886,35 @@ class ProxyLogging: except Exception as e: raise e - return + async def async_log_proxy_authentication_errors( + self, + original_exception: Exception, + request: Request, + parent_otel_span: Optional[Any], + api_key: str, + ): + """ + Handler for Logging Authentication Errors on LiteLLM Proxy + + Why not use post_call_failure_hook? + - `post_call_failure_hook` calls `litellm_logging_obj.async_failure_handler`. This led to the Exception being logged twice + + What does this handler do? + - Logs Authentication Errors (like invalid API Key passed) to CustomLogger compatible classes + - calls CustomLogger.async_post_call_failure_hook + """ + + user_api_key_dict = UserAPIKeyAuth( + parent_otel_span=parent_otel_span, + token=_hash_token_if_needed(token=api_key), + ) + request_data = await request.json() + await self._run_post_call_failure_hook_custom_loggers( + original_exception=original_exception, + request_data=request_data, + user_api_key_dict=user_api_key_dict, + ) + pass async def post_call_success_hook( self, @@ -986,7 +1028,7 @@ class ProxyLogging: ### DB CONNECTOR ### # Define the retry decorator with backoff strategy -# Function to be called whenever a retry is about to happen +# Function to be called whenever a retry is about to happen def on_backoff(details): # The 'tries' key in the details dictionary contains the number of completed tries print_verbose(f"Backing off... this was attempt #{details['tries']}") diff --git a/litellm/integrations/datadog/types.py b/litellm/types/integrations/datadog.py similarity index 68% rename from litellm/integrations/datadog/types.py rename to litellm/types/integrations/datadog.py index 87aa3ce17..79d4eded4 100644 --- a/litellm/integrations/datadog/types.py +++ b/litellm/types/integrations/datadog.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import TypedDict +from typing import Optional, TypedDict class DataDogStatus(str, Enum): @@ -19,3 +19,11 @@ class DatadogPayload(TypedDict, total=False): class DD_ERRORS(Enum): DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`" + + +class DatadogProxyFailureHookJsonMessage(TypedDict, total=False): + exception: str + error_class: str + status_code: Optional[int] + traceback: str + user_api_key_dict: dict diff --git a/tests/logging_callback_tests/test_datadog.py b/tests/logging_callback_tests/test_datadog.py new file mode 100644 index 000000000..8e3ff0de6 --- /dev/null +++ b/tests/logging_callback_tests/test_datadog.py @@ -0,0 +1,138 @@ +import io +import os +import sys + + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import gzip +import json +import logging +import time +from unittest.mock import AsyncMock, patch + +import pytest + +import litellm +from litellm import completion +from litellm._logging import verbose_logger +from litellm.integrations.datadog.datadog import DataDogLogger, DataDogStatus +from datetime import datetime, timedelta +from litellm.types.integrations.datadog_llm_obs import * +from litellm.types.utils import ( + StandardLoggingPayload, + StandardLoggingModelInformation, + StandardLoggingMetadata, + StandardLoggingHiddenParams, +) + +verbose_logger.setLevel(logging.DEBUG) + + +def create_standard_logging_payload() -> StandardLoggingPayload: + return StandardLoggingPayload( + id="test_id", + call_type="completion", + response_cost=0.1, + response_cost_failure_debug_info=None, + status="success", + total_tokens=30, + prompt_tokens=20, + completion_tokens=10, + startTime=1234567890.0, + endTime=1234567891.0, + completionStartTime=1234567890.5, + model_map_information=StandardLoggingModelInformation( + model_map_key="gpt-3.5-turbo", model_map_value=None + ), + model="gpt-3.5-turbo", + model_id="model-123", + model_group="openai-gpt", + api_base="https://api.openai.com", + metadata=StandardLoggingMetadata( + user_api_key_hash="test_hash", + user_api_key_org_id=None, + user_api_key_alias="test_alias", + user_api_key_team_id="test_team", + user_api_key_user_id="test_user", + user_api_key_team_alias="test_team_alias", + spend_logs_metadata=None, + requester_ip_address="127.0.0.1", + requester_metadata=None, + ), + cache_hit=False, + cache_key=None, + saved_cache_cost=0.0, + request_tags=[], + end_user=None, + requester_ip_address="127.0.0.1", + messages=[{"role": "user", "content": "Hello, world!"}], + response={"choices": [{"message": {"content": "Hi there!"}}]}, + error_str=None, + model_parameters={"stream": True}, + hidden_params=StandardLoggingHiddenParams( + model_id="model-123", + cache_key=None, + api_base="https://api.openai.com", + response_cost="0.1", + additional_headers=None, + ), + ) + + +@pytest.mark.asyncio +async def test_create_datadog_logging_payload(): + """Test creating a DataDog logging payload from a standard logging object""" + dd_logger = DataDogLogger() + standard_payload = create_standard_logging_payload() + + # Create mock kwargs with the standard logging object + kwargs = {"standard_logging_object": standard_payload} + + # Test payload creation + dd_payload = dd_logger.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=None, + start_time=datetime.now(), + end_time=datetime.now(), + ) + + # Verify payload structure + assert dd_payload["ddsource"] == os.getenv("DD_SOURCE", "litellm") + assert dd_payload["service"] == "litellm-server" + assert dd_payload["status"] == DataDogStatus.INFO + + # verify the message field == standard_payload + dict_payload = json.loads(dd_payload["message"]) + assert dict_payload == standard_payload + + +@pytest.mark.asyncio +async def test_datadog_failure_logging(): + """Test logging a failure event to DataDog""" + dd_logger = DataDogLogger() + standard_payload = create_standard_logging_payload() + standard_payload["status"] = "failure" # Set status to failure + standard_payload["error_str"] = "Test error" + + kwargs = {"standard_logging_object": standard_payload} + + dd_payload = dd_logger.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=None, + start_time=datetime.now(), + end_time=datetime.now(), + ) + + assert ( + dd_payload["status"] == DataDogStatus.ERROR + ) # Verify failure maps to warning status + + # verify the message field == standard_payload + dict_payload = json.loads(dd_payload["message"]) + assert dict_payload == standard_payload + + # verify error_str is in the message field + assert "error_str" in dict_payload + assert dict_payload["error_str"] == "Test error"