From 030ece8c3fc0216dc39fd915e551e04ecdc1e2b0 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Mon, 28 Oct 2024 22:01:32 +0530 Subject: [PATCH] (Feat) New Logging integration - add Datadog LLM Observability support (#6449) * add type for dd llm obs request ob * working dd llm obs * datadog use well defined type * clean up * unit test test_create_llm_obs_payload * fix linting * add datadog_llm_observability * add datadog_llm_observability * docs DD LLM obs * run testing again * document DD_ENV * test_create_llm_obs_payload --- docs/my-website/docs/proxy/configs.md | 1 + docs/my-website/docs/proxy/logging.md | 22 +++ litellm/__init__.py | 1 + .../integrations/datadog/datadog_llm_obs.py | 169 ++++++++++++++++++ litellm/litellm_core_utils/litellm_logging.py | 9 + litellm/types/integrations/datadog_llm_obs.py | 52 ++++++ .../test_datadog_llm_obs.py | 141 +++++++++++++++ .../test_unit_tests_init_callbacks.py | 2 + 8 files changed, 397 insertions(+) create mode 100644 litellm/integrations/datadog/datadog_llm_obs.py create mode 100644 litellm/types/integrations/datadog_llm_obs.py create mode 100644 tests/logging_callback_tests/test_datadog_llm_obs.py diff --git a/docs/my-website/docs/proxy/configs.md b/docs/my-website/docs/proxy/configs.md index ee9a9096f..28b0b67e3 100644 --- a/docs/my-website/docs/proxy/configs.md +++ b/docs/my-website/docs/proxy/configs.md @@ -918,6 +918,7 @@ router_settings: | DD_API_KEY | API key for Datadog integration | DD_SITE | Site URL for Datadog (e.g., datadoghq.com) | DD_SOURCE | Source identifier for Datadog logs +| DD_ENV | Environment identifier for Datadog logs. Only supported for `datadog_llm_observability` callback | DEBUG_OTEL | Enable debug mode for OpenTelemetry | DIRECT_URL | Direct URL for service endpoint | DISABLE_ADMIN_UI | Toggle to disable the admin UI diff --git a/docs/my-website/docs/proxy/logging.md b/docs/my-website/docs/proxy/logging.md index 72c2e3773..94faa7734 100644 --- a/docs/my-website/docs/proxy/logging.md +++ b/docs/my-website/docs/proxy/logging.md @@ -1468,6 +1468,13 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \ ## Logging Proxy Input/Output - DataDog +LiteLLM Supports logging to the following Datdog Integrations: +- `datadog` [Datadog Logs](https://docs.datadoghq.com/logs/) +- `datadog_llm_observability` [Datadog LLM Observability](https://www.datadoghq.com/product/llm-observability/) + + + + We will use the `--config` to set `litellm.success_callback = ["datadog"]` this will log all successfull LLM calls to DataDog **Step 1**: Create a `config.yaml` file and set `litellm_settings`: `success_callback` @@ -1482,6 +1489,21 @@ litellm_settings: service_callback: ["datadog"] # logs redis, postgres failures on datadog ``` + + + +```yaml +model_list: + - model_name: gpt-3.5-turbo + litellm_params: + model: gpt-3.5-turbo +litellm_settings: + callbacks: ["datadog_llm_observability"] # logs llm success logs on datadog +``` + + + + **Step 2**: Set Required env variables for datadog ```shell diff --git a/litellm/__init__.py b/litellm/__init__.py index b1033e7a4..a42a8f90d 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -49,6 +49,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "langsmith", "prometheus", "datadog", + "datadog_llm_observability", "galileo", "braintrust", "arize", diff --git a/litellm/integrations/datadog/datadog_llm_obs.py b/litellm/integrations/datadog/datadog_llm_obs.py new file mode 100644 index 000000000..9666c4581 --- /dev/null +++ b/litellm/integrations/datadog/datadog_llm_obs.py @@ -0,0 +1,169 @@ +""" +Implements logging integration with Datadog's LLM Observability Service + + +API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards + +""" + +import asyncio +import os +import traceback +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +from httpx import Response + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.integrations.datadog_llm_obs import * +from litellm.types.utils import StandardLoggingPayload + + +class DataDogLLMObsLogger(CustomBatchLogger): + def __init__(self, **kwargs): + try: + verbose_logger.debug("DataDogLLMObs: Initializing logger") + if os.getenv("DD_API_KEY", None) is None: + raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'") + if os.getenv("DD_SITE", None) is None: + raise Exception( + "DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`" + ) + + self.async_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.DD_API_KEY = os.getenv("DD_API_KEY") + self.DD_SITE = os.getenv("DD_SITE") + self.intake_url = ( + f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans" + ) + + # testing base url + dd_base_url = os.getenv("DD_BASE_URL") + if dd_base_url: + self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans" + + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + self.log_queue: List[LLMObsPayload] = [] + super().__init__(**kwargs, flush_lock=self.flush_lock) + except Exception as e: + verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}") + raise e + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug( + f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}" + ) + payload = self.create_llm_obs_payload( + kwargs, response_obj, start_time, end_time + ) + verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}") + self.log_queue.append(payload) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + except Exception as e: + verbose_logger.exception( + f"DataDogLLMObs: Error logging success event - {str(e)}" + ) + + async def async_send_batch(self): + try: + if not self.log_queue: + return + + verbose_logger.debug( + f"DataDogLLMObs: Flushing {len(self.log_queue)} events" + ) + + # Prepare the payload + payload = { + "data": DDIntakePayload( + type="span", + attributes=DDSpanAttributes( + ml_app="litellm", + tags=[ + "service:litellm", + f"env:{os.getenv('DD_ENV', 'production')}", + ], + spans=self.log_queue, + ), + ), + } + + response = await self.async_client.post( + url=self.intake_url, + json=payload, + headers={ + "DD-API-KEY": self.DD_API_KEY, + "Content-Type": "application/json", + }, + ) + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" + ) + self.log_queue.clear() + except Exception as e: + verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}") + + def create_llm_obs_payload( + self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime + ) -> LLMObsPayload: + standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object" + ) + if standard_logging_payload is None: + raise Exception("DataDogLLMObs: standard_logging_object is not set") + + messages = standard_logging_payload["messages"] + metadata = kwargs.get("litellm_params", {}).get("metadata", {}) + + input_meta = InputMeta(messages=messages) # type: ignore + output_meta = OutputMeta(messages=self._get_response_messages(response_obj)) + + meta = Meta(kind="llm", input=input_meta, output=output_meta) + + # Calculate metrics (you may need to adjust these based on available data) + metrics = LLMMetrics( + input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)), + output_tokens=float(standard_logging_payload.get("completion_tokens", 0)), + total_tokens=float(standard_logging_payload.get("total_tokens", 0)), + ) + + return LLMObsPayload( + parent_id=metadata.get("parent_id", "undefined"), + trace_id=metadata.get("trace_id", str(uuid.uuid4())), + span_id=metadata.get("span_id", str(uuid.uuid4())), + name=metadata.get("name", "litellm_llm_call"), + meta=meta, + start_ns=int(start_time.timestamp() * 1e9), + duration=int((end_time - start_time).total_seconds() * 1e9), + metrics=metrics, + ) + + def _get_response_messages(self, response_obj: Any) -> List[Any]: + """ + Get the messages from the response object + + for now this handles logging /chat/completions responses + """ + if isinstance(response_obj, litellm.ModelResponse): + return [response_obj["choices"][0]["message"].json()] + return [] diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 206cb235e..7f403e422 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -64,6 +64,7 @@ from ..integrations.arize_ai import ArizeLogger from ..integrations.athina import AthinaLogger from ..integrations.braintrust_logging import BraintrustLogger from ..integrations.datadog.datadog import DataDogLogger +from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger from ..integrations.dynamodb import DyanmoDBLogger from ..integrations.galileo import GalileoObserve from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger @@ -2205,6 +2206,10 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915 _datadog_logger = DataDogLogger() _in_memory_loggers.append(_datadog_logger) return _datadog_logger # type: ignore + elif logging_integration == "datadog_llm_observability": + _datadog_llm_obs_logger = DataDogLLMObsLogger() + _in_memory_loggers.append(_datadog_llm_obs_logger) + return _datadog_llm_obs_logger # type: ignore elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): @@ -2372,6 +2377,10 @@ def get_custom_logger_compatible_class( for callback in _in_memory_loggers: if isinstance(callback, DataDogLogger): return callback + elif logging_integration == "datadog_llm_observability": + for callback in _in_memory_loggers: + if isinstance(callback, DataDogLLMObsLogger): + return callback elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): diff --git a/litellm/types/integrations/datadog_llm_obs.py b/litellm/types/integrations/datadog_llm_obs.py new file mode 100644 index 000000000..119d8ecc7 --- /dev/null +++ b/litellm/types/integrations/datadog_llm_obs.py @@ -0,0 +1,52 @@ +""" +Payloads for Datadog LLM Observability Service (LLMObs) + +API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards +""" + +from typing import Any, List, Literal, Optional, TypedDict + + +class InputMeta(TypedDict): + messages: List[Any] + + +class OutputMeta(TypedDict): + messages: List[Any] + + +class Meta(TypedDict): + # The span kind: "agent", "workflow", "llm", "tool", "task", "embedding", or "retrieval". + kind: Literal["llm", "tool", "task", "embedding", "retrieval"] + input: InputMeta # The span’s input information. + output: OutputMeta # The span’s output information. + + +class LLMMetrics(TypedDict, total=False): + input_tokens: float + output_tokens: float + total_tokens: float + time_to_first_token: float + time_per_output_token: float + + +class LLMObsPayload(TypedDict): + parent_id: str + trace_id: str + span_id: str + name: str + meta: Meta + start_ns: int + duration: int + metrics: LLMMetrics + + +class DDSpanAttributes(TypedDict): + ml_app: str + tags: List[str] + spans: List[LLMObsPayload] + + +class DDIntakePayload(TypedDict): + type: str + attributes: DDSpanAttributes diff --git a/tests/logging_callback_tests/test_datadog_llm_obs.py b/tests/logging_callback_tests/test_datadog_llm_obs.py new file mode 100644 index 000000000..84ec3b2d9 --- /dev/null +++ b/tests/logging_callback_tests/test_datadog_llm_obs.py @@ -0,0 +1,141 @@ +""" +Test the DataDogLLMObsLogger +""" + +import io +import os +import sys + + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import gzip +import json +import logging +import time +from unittest.mock import AsyncMock, patch + +import pytest + +import litellm +from litellm import completion +from litellm._logging import verbose_logger +from litellm.integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger +from datetime import datetime, timedelta +from litellm.types.integrations.datadog_llm_obs import * +from litellm.types.utils import ( + StandardLoggingPayload, + StandardLoggingModelInformation, + StandardLoggingMetadata, + StandardLoggingHiddenParams, +) + +verbose_logger.setLevel(logging.DEBUG) + + +def create_standard_logging_payload() -> StandardLoggingPayload: + return StandardLoggingPayload( + id="test_id", + call_type="completion", + response_cost=0.1, + response_cost_failure_debug_info=None, + status="success", + total_tokens=30, + prompt_tokens=20, + completion_tokens=10, + startTime=1234567890.0, + endTime=1234567891.0, + completionStartTime=1234567890.5, + model_map_information=StandardLoggingModelInformation( + model_map_key="gpt-3.5-turbo", model_map_value=None + ), + model="gpt-3.5-turbo", + model_id="model-123", + model_group="openai-gpt", + api_base="https://api.openai.com", + metadata=StandardLoggingMetadata( + user_api_key_hash="test_hash", + user_api_key_org_id=None, + user_api_key_alias="test_alias", + user_api_key_team_id="test_team", + user_api_key_user_id="test_user", + user_api_key_team_alias="test_team_alias", + spend_logs_metadata=None, + requester_ip_address="127.0.0.1", + requester_metadata=None, + ), + cache_hit=False, + cache_key=None, + saved_cache_cost=0.0, + request_tags=[], + end_user=None, + requester_ip_address="127.0.0.1", + messages=[{"role": "user", "content": "Hello, world!"}], + response={"choices": [{"message": {"content": "Hi there!"}}]}, + error_str=None, + model_parameters={"stream": True}, + hidden_params=StandardLoggingHiddenParams( + model_id="model-123", + cache_key=None, + api_base="https://api.openai.com", + response_cost="0.1", + additional_headers=None, + ), + ) + + +@pytest.mark.asyncio +async def test_datadog_llm_obs_logging(): + datadog_llm_obs_logger = DataDogLLMObsLogger() + litellm.callbacks = [datadog_llm_obs_logger] + litellm.set_verbose = True + + for _ in range(2): + response = await litellm.acompletion( + model="gpt-4o", messages=["Hello testing dd llm obs!"], mock_response="hi" + ) + + print(response) + + await asyncio.sleep(6) + + +@pytest.mark.asyncio +async def test_create_llm_obs_payload(): + datadog_llm_obs_logger = DataDogLLMObsLogger() + standard_logging_payload = create_standard_logging_payload() + payload = datadog_llm_obs_logger.create_llm_obs_payload( + kwargs={ + "model": "gpt-4", + "messages": [{"role": "user", "content": "Hello"}], + "standard_logging_object": standard_logging_payload, + }, + response_obj=litellm.ModelResponse( + id="test_id", + choices=[{"message": {"content": "Hi there!"}}], + created=12, + model="gpt-4", + ), + start_time=datetime.now(), + end_time=datetime.now() + timedelta(seconds=1), + ) + + print("dd created payload", payload) + + assert payload["name"] == "litellm_llm_call" + assert payload["meta"]["kind"] == "llm" + assert payload["meta"]["input"]["messages"] == [ + {"role": "user", "content": "Hello, world!"} + ] + assert payload["meta"]["output"]["messages"] == [ + { + "content": "Hi there!", + "role": "assistant", + "tool_calls": None, + "function_call": None, + } + ] + assert payload["metrics"]["input_tokens"] == 20 + assert payload["metrics"]["output_tokens"] == 10 + assert payload["metrics"]["total_tokens"] == 30 diff --git a/tests/logging_callback_tests/test_unit_tests_init_callbacks.py b/tests/logging_callback_tests/test_unit_tests_init_callbacks.py index 5b14717dc..ebc7dd33c 100644 --- a/tests/logging_callback_tests/test_unit_tests_init_callbacks.py +++ b/tests/logging_callback_tests/test_unit_tests_init_callbacks.py @@ -27,6 +27,7 @@ from litellm.integrations.langsmith import LangsmithLogger from litellm.integrations.literal_ai import LiteralAILogger from litellm.integrations.prometheus import PrometheusLogger from litellm.integrations.datadog.datadog import DataDogLogger +from litellm.integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger from litellm.integrations.gcs_bucket.gcs_bucket import GCSBucketLogger from litellm.integrations.opik.opik import OpikLogger from litellm.integrations.opentelemetry import OpenTelemetry @@ -49,6 +50,7 @@ callback_class_str_to_classType = { "literalai": LiteralAILogger, "prometheus": PrometheusLogger, "datadog": DataDogLogger, + "datadog_llm_observability": DataDogLLMObsLogger, "gcs_bucket": GCSBucketLogger, "opik": OpikLogger, "argilla": ArgillaLogger,