diff --git a/litellm/_service_logger.py b/litellm/_service_logger.py index 5b9e3b085..3c1caa1b7 100644 --- a/litellm/_service_logger.py +++ b/litellm/_service_logger.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, Optional, Union import litellm +from litellm._logging import verbose_logger from litellm.proxy._types import UserAPIKeyAuth from .integrations.custom_logger import CustomLogger @@ -77,11 +78,29 @@ class ServiceLogging(CustomLogger): await self.prometheusServicesLogger.async_service_success_hook( payload=payload ) + elif callback == "datadog": + from litellm.integrations.datadog.datadog import DataDogLogger + + await self.init_datadog_logger_if_none() + await self.dd_logger.async_service_success_hook( + payload=payload, + parent_otel_span=parent_otel_span, + start_time=start_time, + end_time=end_time, + event_metadata=event_metadata, + ) elif callback == "otel": + from litellm.integrations.opentelemetry import OpenTelemetry from litellm.proxy.proxy_server import open_telemetry_logger - if parent_otel_span is not None and open_telemetry_logger is not None: - await open_telemetry_logger.async_service_success_hook( + await self.init_otel_logger_if_none() + + if ( + parent_otel_span is not None + and open_telemetry_logger is not None + and isinstance(open_telemetry_logger, OpenTelemetry) + ): + await self.otel_logger.async_service_success_hook( payload=payload, parent_otel_span=parent_otel_span, start_time=start_time, @@ -100,6 +119,37 @@ class ServiceLogging(CustomLogger): self.prometheusServicesLogger = self.prometheusServicesLogger() return + async def init_datadog_logger_if_none(self): + """ + initializes dd_logger if it is None or no attribute exists on ServiceLogging Object + + """ + from litellm.integrations.datadog.datadog import DataDogLogger + + if not hasattr(self, "dd_logger"): + self.dd_logger: DataDogLogger = DataDogLogger() + + return + + async def init_otel_logger_if_none(self): + """ + initializes otel_logger if it is None or no attribute exists on ServiceLogging Object + + """ + from litellm.integrations.opentelemetry import OpenTelemetry + from litellm.proxy.proxy_server import open_telemetry_logger + + if not hasattr(self, "otel_logger"): + if open_telemetry_logger is not None and isinstance( + open_telemetry_logger, OpenTelemetry + ): + self.otel_logger: OpenTelemetry = open_telemetry_logger + else: + verbose_logger.warning( + "ServiceLogger: open_telemetry_logger is None or not an instance of OpenTelemetry" + ) + return + async def async_service_failure_hook( self, service: ServiceTypes, @@ -136,13 +186,23 @@ class ServiceLogging(CustomLogger): await self.prometheusServicesLogger.async_service_failure_hook( payload=payload ) + elif callback == "datadog": + await self.init_datadog_logger_if_none() + await self.dd_logger.async_service_failure_hook( + payload=payload, + error=error_message, + parent_otel_span=parent_otel_span, + start_time=start_time, + end_time=end_time, + event_metadata=event_metadata, + ) from litellm.proxy.proxy_server import open_telemetry_logger if not isinstance(error, str): error = str(error) if open_telemetry_logger is not None: - await open_telemetry_logger.async_service_failure_hook( + await self.otel_logger.async_service_failure_hook( payload=payload, parent_otel_span=parent_otel_span, start_time=start_time, diff --git a/litellm/integrations/custom_logger.py b/litellm/integrations/custom_logger.py index ba38c4fb9..0ea9d4a18 100644 --- a/litellm/integrations/custom_logger.py +++ b/litellm/integrations/custom_logger.py @@ -2,6 +2,7 @@ # On success, logs events to Promptlayer import os import traceback +from datetime import datetime as datetimeObj from typing import Any, Literal, Optional, Tuple, Union import dotenv @@ -10,6 +11,7 @@ from pydantic import BaseModel from litellm.caching import DualCache from litellm.proxy._types import UserAPIKeyAuth from litellm.types.llms.openai import ChatCompletionRequest +from litellm.types.services import ServiceLoggerPayload from litellm.types.utils import AdapterCompletionStreamWrapper, ModelResponse diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index d711604de..8610b36c3 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -6,7 +6,9 @@ DD Reference API: https://docs.datadoghq.com/api/latest/logs `async_log_success_event` - used by litellm proxy to send logs to datadog `log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions -async_log_success_event will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds +async_log_success_event: will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds + +async_service_failure_hook: Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog For batching specific details see CustomBatchLogger class """ @@ -17,6 +19,7 @@ import os import sys import traceback import uuid +from datetime import datetime as datetimeObj from typing import Any, Dict, List, Optional, Union from httpx import Response @@ -29,8 +32,9 @@ from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, ) +from litellm.types.services import ServiceLoggerPayload -from .types import DD_ERRORS, DatadogPayload +from .types import DD_ERRORS, DatadogPayload, DataDogStatus from .utils import make_json_serializable DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept @@ -182,7 +186,7 @@ class DataDogLogger(CustomBatchLogger): response = self.sync_client.post( url=self.intake_url, - json=dd_payload, + json=dd_payload, # type: ignore headers={ "DD-API-KEY": self.DD_API_KEY, }, @@ -294,6 +298,7 @@ class DataDogLogger(CustomBatchLogger): hostname="", message=json_payload, service="litellm-server", + status=DataDogStatus.INFO, ) return dd_payload @@ -312,7 +317,7 @@ class DataDogLogger(CustomBatchLogger): compressed_data = gzip.compress(json.dumps(data).encode("utf-8")) response = await self.async_client.post( url=self.intake_url, - data=compressed_data, + data=compressed_data, # type: ignore headers={ "DD-API-KEY": self.DD_API_KEY, "Content-Encoding": "gzip", @@ -320,3 +325,56 @@ class DataDogLogger(CustomBatchLogger): }, ) return response + + async def async_service_failure_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog + + - example - Redis is failing / erroring, will be logged on DataDog + """ + + try: + import json + + _payload_dict = payload.model_dump() + _dd_message_str = json.dumps(_payload_dict) + _dd_payload = DatadogPayload( + ddsource="litellm", + ddtags="", + hostname="", + message=_dd_message_str, + service="litellm-server", + status=DataDogStatus.WARN, + ) + + self.log_queue.append(_dd_payload) + + except Exception as e: + verbose_logger.exception( + f"Datadog: Logger - Exception in async_service_failure_hook: {e}" + ) + pass + + async def async_service_success_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog + + No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this + """ + return diff --git a/litellm/integrations/datadog/types.py b/litellm/integrations/datadog/types.py index c8ad4e47a..87aa3ce17 100644 --- a/litellm/integrations/datadog/types.py +++ b/litellm/integrations/datadog/types.py @@ -2,12 +2,19 @@ from enum import Enum from typing import TypedDict +class DataDogStatus(str, Enum): + INFO = "info" + WARN = "warning" + ERROR = "error" + + class DatadogPayload(TypedDict, total=False): ddsource: str ddtags: str hostname: str message: str service: str + status: str class DD_ERRORS(Enum): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 9fe01f35c..1a2dc3cf8 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -23,16 +23,5 @@ model_list: general_settings: master_key: sk-1234 - alerting: ["slack"] - alerting_threshold: 0.0001 # (Seconds) set an artifically low threshold for testing alerting - alert_to_webhook_url: { - "llm_too_slow": [ - "os.environ/SLACK_WEBHOOK_URL", - "os.environ/SLACK_WEBHOOK_URL_2", - ], - } -litellm_settings: - callbacks: ["gcs_bucket"] - diff --git a/litellm/tests/test_datadog.py b/litellm/tests/test_datadog.py index 51c0e99d0..af496aadf 100644 --- a/litellm/tests/test_datadog.py +++ b/litellm/tests/test_datadog.py @@ -145,6 +145,88 @@ async def test_datadog_logging_http_request(): pytest.fail(f"Test failed with exception: {str(e)}") +@pytest.mark.asyncio +async def test_datadog_log_redis_failures(): + """ + Test that poorly configured Redis is logged as Warning on DataDog + """ + try: + from litellm.caching import Cache + from litellm.integrations.datadog.datadog import DataDogLogger + + litellm.cache = Cache( + type="redis", host="badhost", port="6379", password="badpassword" + ) + + os.environ["DD_SITE"] = "https://fake.datadoghq.com" + os.environ["DD_API_KEY"] = "anything" + dd_logger = DataDogLogger() + + litellm.callbacks = [dd_logger] + litellm.service_callback = ["datadog"] + + litellm.set_verbose = True + + # Create a mock for the async_client's post method + mock_post = AsyncMock() + mock_post.return_value.status_code = 202 + mock_post.return_value.text = "Accepted" + dd_logger.async_client.post = mock_post + + # Make the completion call + for _ in range(3): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "what llm are u"}], + max_tokens=10, + temperature=0.2, + mock_response="Accepted", + ) + print(response) + + # Wait for 5 seconds + await asyncio.sleep(6) + + # Assert that the mock was called + assert mock_post.called, "HTTP request was not made" + + # Get the arguments of the last call + args, kwargs = mock_post.call_args + print("CAll args and kwargs", args, kwargs) + + # For example, checking if the URL is correct + assert kwargs["url"].endswith("/api/v2/logs"), "Incorrect DataDog endpoint" + + body = kwargs["data"] + + # use gzip to unzip the body + with gzip.open(io.BytesIO(body), "rb") as f: + body = f.read().decode("utf-8") + print(body) + + # body is string parse it to dict + body = json.loads(body) + print(body) + + failure_events = [log for log in body if log["status"] == "warning"] + assert len(failure_events) > 0, "No failure events logged" + + print("ALL FAILURE/WARN EVENTS", failure_events) + + for event in failure_events: + message = json.loads(event["message"]) + assert ( + event["status"] == "warning" + ), f"Event status is not 'warning': {event['status']}" + assert ( + message["service"] == "redis" + ), f"Service is not 'redis': {message['service']}" + assert "error" in message, "No 'error' field in the message" + assert message["error"], "Error field is empty" + except Exception as e: + pytest.fail(f"Test failed with exception: {str(e)}") + + @pytest.mark.asyncio @pytest.mark.skip(reason="local-only test, to test if everything works fine.") async def test_datadog_logging():