(Feat) New Logging integration - add Datadog LLM Observability support (#6449)

* add type for dd llm obs request ob

* working dd llm obs

* datadog use well defined type

* clean up

* unit test test_create_llm_obs_payload

* fix linting

* add datadog_llm_observability

* add datadog_llm_observability

* docs DD LLM obs

* run testing again

* document DD_ENV

* test_create_llm_obs_payload
This commit is contained in:
Ishaan Jaff 2024-10-28 22:01:32 +05:30 committed by GitHub
parent 151991c66d
commit 030ece8c3f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 397 additions and 0 deletions

View file

@ -918,6 +918,7 @@ router_settings:
| DD_API_KEY | API key for Datadog integration
| DD_SITE | Site URL for Datadog (e.g., datadoghq.com)
| DD_SOURCE | Source identifier for Datadog logs
| DD_ENV | Environment identifier for Datadog logs. Only supported for `datadog_llm_observability` callback
| DEBUG_OTEL | Enable debug mode for OpenTelemetry
| DIRECT_URL | Direct URL for service endpoint
| DISABLE_ADMIN_UI | Toggle to disable the admin UI

View file

@ -1468,6 +1468,13 @@ curl --location 'http://0.0.0.0:4000/chat/completions' \
## Logging Proxy Input/Output - DataDog
LiteLLM Supports logging to the following Datdog Integrations:
- `datadog` [Datadog Logs](https://docs.datadoghq.com/logs/)
- `datadog_llm_observability` [Datadog LLM Observability](https://www.datadoghq.com/product/llm-observability/)
<Tabs>
<TabItem value="datadog" label="Datadog Logs">
We will use the `--config` to set `litellm.success_callback = ["datadog"]` this will log all successfull LLM calls to DataDog
**Step 1**: Create a `config.yaml` file and set `litellm_settings`: `success_callback`
@ -1482,6 +1489,21 @@ litellm_settings:
service_callback: ["datadog"] # logs redis, postgres failures on datadog
```
</TabItem>
<TabItem value="datadog_llm_observability" label="Datadog LLM Observability">
```yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
litellm_settings:
callbacks: ["datadog_llm_observability"] # logs llm success logs on datadog
```
</TabItem>
</Tabs>
**Step 2**: Set Required env variables for datadog
```shell

View file

@ -49,6 +49,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"langsmith",
"prometheus",
"datadog",
"datadog_llm_observability",
"galileo",
"braintrust",
"arize",

View file

@ -0,0 +1,169 @@
"""
Implements logging integration with Datadog's LLM Observability Service
API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards
"""
import asyncio
import os
import traceback
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from httpx import Response
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.types.integrations.datadog_llm_obs import *
from litellm.types.utils import StandardLoggingPayload
class DataDogLLMObsLogger(CustomBatchLogger):
def __init__(self, **kwargs):
try:
verbose_logger.debug("DataDogLLMObs: Initializing logger")
if os.getenv("DD_API_KEY", None) is None:
raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'")
if os.getenv("DD_SITE", None) is None:
raise Exception(
"DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`"
)
self.async_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.DD_API_KEY = os.getenv("DD_API_KEY")
self.DD_SITE = os.getenv("DD_SITE")
self.intake_url = (
f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans"
)
# testing base url
dd_base_url = os.getenv("DD_BASE_URL")
if dd_base_url:
self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans"
asyncio.create_task(self.periodic_flush())
self.flush_lock = asyncio.Lock()
self.log_queue: List[LLMObsPayload] = []
super().__init__(**kwargs, flush_lock=self.flush_lock)
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}")
raise e
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug(
f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}"
)
payload = self.create_llm_obs_payload(
kwargs, response_obj, start_time, end_time
)
verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}")
self.log_queue.append(payload)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
except Exception as e:
verbose_logger.exception(
f"DataDogLLMObs: Error logging success event - {str(e)}"
)
async def async_send_batch(self):
try:
if not self.log_queue:
return
verbose_logger.debug(
f"DataDogLLMObs: Flushing {len(self.log_queue)} events"
)
# Prepare the payload
payload = {
"data": DDIntakePayload(
type="span",
attributes=DDSpanAttributes(
ml_app="litellm",
tags=[
"service:litellm",
f"env:{os.getenv('DD_ENV', 'production')}",
],
spans=self.log_queue,
),
),
}
response = await self.async_client.post(
url=self.intake_url,
json=payload,
headers={
"DD-API-KEY": self.DD_API_KEY,
"Content-Type": "application/json",
},
)
response.raise_for_status()
if response.status_code != 202:
raise Exception(
f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}"
)
verbose_logger.debug(
f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}"
)
self.log_queue.clear()
except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}")
def create_llm_obs_payload(
self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime
) -> LLMObsPayload:
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if standard_logging_payload is None:
raise Exception("DataDogLLMObs: standard_logging_object is not set")
messages = standard_logging_payload["messages"]
metadata = kwargs.get("litellm_params", {}).get("metadata", {})
input_meta = InputMeta(messages=messages) # type: ignore
output_meta = OutputMeta(messages=self._get_response_messages(response_obj))
meta = Meta(kind="llm", input=input_meta, output=output_meta)
# Calculate metrics (you may need to adjust these based on available data)
metrics = LLMMetrics(
input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)),
output_tokens=float(standard_logging_payload.get("completion_tokens", 0)),
total_tokens=float(standard_logging_payload.get("total_tokens", 0)),
)
return LLMObsPayload(
parent_id=metadata.get("parent_id", "undefined"),
trace_id=metadata.get("trace_id", str(uuid.uuid4())),
span_id=metadata.get("span_id", str(uuid.uuid4())),
name=metadata.get("name", "litellm_llm_call"),
meta=meta,
start_ns=int(start_time.timestamp() * 1e9),
duration=int((end_time - start_time).total_seconds() * 1e9),
metrics=metrics,
)
def _get_response_messages(self, response_obj: Any) -> List[Any]:
"""
Get the messages from the response object
for now this handles logging /chat/completions responses
"""
if isinstance(response_obj, litellm.ModelResponse):
return [response_obj["choices"][0]["message"].json()]
return []

View file

@ -64,6 +64,7 @@ from ..integrations.arize_ai import ArizeLogger
from ..integrations.athina import AthinaLogger
from ..integrations.braintrust_logging import BraintrustLogger
from ..integrations.datadog.datadog import DataDogLogger
from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger
from ..integrations.dynamodb import DyanmoDBLogger
from ..integrations.galileo import GalileoObserve
from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger
@ -2205,6 +2206,10 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915
_datadog_logger = DataDogLogger()
_in_memory_loggers.append(_datadog_logger)
return _datadog_logger # type: ignore
elif logging_integration == "datadog_llm_observability":
_datadog_llm_obs_logger = DataDogLLMObsLogger()
_in_memory_loggers.append(_datadog_llm_obs_logger)
return _datadog_llm_obs_logger # type: ignore
elif logging_integration == "gcs_bucket":
for callback in _in_memory_loggers:
if isinstance(callback, GCSBucketLogger):
@ -2372,6 +2377,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, DataDogLogger):
return callback
elif logging_integration == "datadog_llm_observability":
for callback in _in_memory_loggers:
if isinstance(callback, DataDogLLMObsLogger):
return callback
elif logging_integration == "gcs_bucket":
for callback in _in_memory_loggers:
if isinstance(callback, GCSBucketLogger):

View file

@ -0,0 +1,52 @@
"""
Payloads for Datadog LLM Observability Service (LLMObs)
API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards
"""
from typing import Any, List, Literal, Optional, TypedDict
class InputMeta(TypedDict):
messages: List[Any]
class OutputMeta(TypedDict):
messages: List[Any]
class Meta(TypedDict):
# The span kind: "agent", "workflow", "llm", "tool", "task", "embedding", or "retrieval".
kind: Literal["llm", "tool", "task", "embedding", "retrieval"]
input: InputMeta # The spans input information.
output: OutputMeta # The spans output information.
class LLMMetrics(TypedDict, total=False):
input_tokens: float
output_tokens: float
total_tokens: float
time_to_first_token: float
time_per_output_token: float
class LLMObsPayload(TypedDict):
parent_id: str
trace_id: str
span_id: str
name: str
meta: Meta
start_ns: int
duration: int
metrics: LLMMetrics
class DDSpanAttributes(TypedDict):
ml_app: str
tags: List[str]
spans: List[LLMObsPayload]
class DDIntakePayload(TypedDict):
type: str
attributes: DDSpanAttributes

View file

@ -0,0 +1,141 @@
"""
Test the DataDogLLMObsLogger
"""
import io
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import gzip
import json
import logging
import time
from unittest.mock import AsyncMock, patch
import pytest
import litellm
from litellm import completion
from litellm._logging import verbose_logger
from litellm.integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger
from datetime import datetime, timedelta
from litellm.types.integrations.datadog_llm_obs import *
from litellm.types.utils import (
StandardLoggingPayload,
StandardLoggingModelInformation,
StandardLoggingMetadata,
StandardLoggingHiddenParams,
)
verbose_logger.setLevel(logging.DEBUG)
def create_standard_logging_payload() -> StandardLoggingPayload:
return StandardLoggingPayload(
id="test_id",
call_type="completion",
response_cost=0.1,
response_cost_failure_debug_info=None,
status="success",
total_tokens=30,
prompt_tokens=20,
completion_tokens=10,
startTime=1234567890.0,
endTime=1234567891.0,
completionStartTime=1234567890.5,
model_map_information=StandardLoggingModelInformation(
model_map_key="gpt-3.5-turbo", model_map_value=None
),
model="gpt-3.5-turbo",
model_id="model-123",
model_group="openai-gpt",
api_base="https://api.openai.com",
metadata=StandardLoggingMetadata(
user_api_key_hash="test_hash",
user_api_key_org_id=None,
user_api_key_alias="test_alias",
user_api_key_team_id="test_team",
user_api_key_user_id="test_user",
user_api_key_team_alias="test_team_alias",
spend_logs_metadata=None,
requester_ip_address="127.0.0.1",
requester_metadata=None,
),
cache_hit=False,
cache_key=None,
saved_cache_cost=0.0,
request_tags=[],
end_user=None,
requester_ip_address="127.0.0.1",
messages=[{"role": "user", "content": "Hello, world!"}],
response={"choices": [{"message": {"content": "Hi there!"}}]},
error_str=None,
model_parameters={"stream": True},
hidden_params=StandardLoggingHiddenParams(
model_id="model-123",
cache_key=None,
api_base="https://api.openai.com",
response_cost="0.1",
additional_headers=None,
),
)
@pytest.mark.asyncio
async def test_datadog_llm_obs_logging():
datadog_llm_obs_logger = DataDogLLMObsLogger()
litellm.callbacks = [datadog_llm_obs_logger]
litellm.set_verbose = True
for _ in range(2):
response = await litellm.acompletion(
model="gpt-4o", messages=["Hello testing dd llm obs!"], mock_response="hi"
)
print(response)
await asyncio.sleep(6)
@pytest.mark.asyncio
async def test_create_llm_obs_payload():
datadog_llm_obs_logger = DataDogLLMObsLogger()
standard_logging_payload = create_standard_logging_payload()
payload = datadog_llm_obs_logger.create_llm_obs_payload(
kwargs={
"model": "gpt-4",
"messages": [{"role": "user", "content": "Hello"}],
"standard_logging_object": standard_logging_payload,
},
response_obj=litellm.ModelResponse(
id="test_id",
choices=[{"message": {"content": "Hi there!"}}],
created=12,
model="gpt-4",
),
start_time=datetime.now(),
end_time=datetime.now() + timedelta(seconds=1),
)
print("dd created payload", payload)
assert payload["name"] == "litellm_llm_call"
assert payload["meta"]["kind"] == "llm"
assert payload["meta"]["input"]["messages"] == [
{"role": "user", "content": "Hello, world!"}
]
assert payload["meta"]["output"]["messages"] == [
{
"content": "Hi there!",
"role": "assistant",
"tool_calls": None,
"function_call": None,
}
]
assert payload["metrics"]["input_tokens"] == 20
assert payload["metrics"]["output_tokens"] == 10
assert payload["metrics"]["total_tokens"] == 30

View file

@ -27,6 +27,7 @@ from litellm.integrations.langsmith import LangsmithLogger
from litellm.integrations.literal_ai import LiteralAILogger
from litellm.integrations.prometheus import PrometheusLogger
from litellm.integrations.datadog.datadog import DataDogLogger
from litellm.integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger
from litellm.integrations.gcs_bucket.gcs_bucket import GCSBucketLogger
from litellm.integrations.opik.opik import OpikLogger
from litellm.integrations.opentelemetry import OpenTelemetry
@ -49,6 +50,7 @@ callback_class_str_to_classType = {
"literalai": LiteralAILogger,
"prometheus": PrometheusLogger,
"datadog": DataDogLogger,
"datadog_llm_observability": DataDogLLMObsLogger,
"gcs_bucket": GCSBucketLogger,
"opik": OpikLogger,
"argilla": ArgillaLogger,