Compare commits

...
Sign in to create a new pull request.

10 commits

Author SHA1 Message Date
Ishaan Jaff
cbaaa1feea fix async_log_proxy_authentication_errors 2024-11-26 19:24:47 -08:00
Ishaan Jaff
2803661ed8 add async_post_call_failure_hook 2024-11-26 19:12:38 -08:00
Ishaan Jaff
90168de961 use correct loc for types 2024-11-26 19:09:16 -08:00
Ishaan Jaff
f916597303 add async_post_call_failure_hook 2024-11-26 18:59:25 -08:00
Ishaan Jaff
f9a40e5db3 add unit tests for DD logging payload 2024-11-26 18:35:14 -08:00
Ishaan Jaff
72562ddf2b allow opting into _create_v0_logging_payload 2024-11-26 18:17:58 -08:00
Ishaan Jaff
48c4b272f7 fix use SLP status 2024-11-26 18:10:03 -08:00
Ishaan Jaff
d2b6054f0c use standard logging payload for DD 2024-11-26 17:58:46 -08:00
Ishaan Jaff
6d0424881a use standard logging payload for DD logging 2024-11-26 17:53:01 -08:00
Ishaan Jaff
7d6e45f78e add async_log_failure_event for dd 2024-11-26 17:45:20 -08:00
7 changed files with 393 additions and 117 deletions

View file

@ -68,6 +68,7 @@ callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] =
langfuse_default_tags: Optional[List[str]] = None langfuse_default_tags: Optional[List[str]] = None
langsmith_batch_size: Optional[int] = None langsmith_batch_size: Optional[int] = None
argilla_batch_size: Optional[int] = None argilla_batch_size: Optional[int] = None
datadog_use_v1: Optional[bool] = False # if you want to use v1 datadog logged payload
argilla_transformation_object: Optional[Dict[str, Any]] = None argilla_transformation_object: Optional[Dict[str, Any]] = None
_async_input_callback: List[Callable] = ( _async_input_callback: List[Callable] = (
[] []

View file

@ -32,9 +32,11 @@ from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client, get_async_httpx_client,
httpxSpecialProvider, httpxSpecialProvider,
) )
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.integrations.datadog import *
from litellm.types.services import ServiceLoggerPayload from litellm.types.services import ServiceLoggerPayload
from litellm.types.utils import StandardLoggingPayload
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
from .utils import make_json_serializable from .utils import make_json_serializable
DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept
@ -106,20 +108,20 @@ class DataDogLogger(CustomBatchLogger):
verbose_logger.debug( verbose_logger.debug(
"Datadog: Logging - Enters logging function for model %s", kwargs "Datadog: Logging - Enters logging function for model %s", kwargs
) )
dd_payload = self.create_datadog_logging_payload( await self._log_async_event(kwargs, response_obj, start_time, end_time)
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
self.log_queue.append(dd_payload) except Exception as e:
verbose_logger.exception(
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}"
)
pass
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
try:
verbose_logger.debug( verbose_logger.debug(
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." "Datadog: Logging - Enters logging function for model %s", kwargs
) )
await self._log_async_event(kwargs, response_obj, start_time, end_time)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
except Exception as e: except Exception as e:
verbose_logger.exception( verbose_logger.exception(
@ -181,6 +183,14 @@ class DataDogLogger(CustomBatchLogger):
verbose_logger.debug( verbose_logger.debug(
"Datadog: Logging - Enters logging function for model %s", kwargs "Datadog: Logging - Enters logging function for model %s", kwargs
) )
if litellm.datadog_use_v1 is True:
dd_payload = self._create_v0_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
else:
dd_payload = self.create_datadog_logging_payload( dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs, kwargs=kwargs,
response_obj=response_obj, response_obj=response_obj,
@ -215,6 +225,22 @@ class DataDogLogger(CustomBatchLogger):
pass pass
pass pass
async def _log_async_event(self, kwargs, response_obj, start_time, end_time):
dd_payload = self.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=response_obj,
start_time=start_time,
end_time=end_time,
)
self.log_queue.append(dd_payload)
verbose_logger.debug(
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
)
if len(self.log_queue) >= self.batch_size:
await self.async_send_batch()
def create_datadog_logging_payload( def create_datadog_logging_payload(
self, self,
kwargs: Union[dict, Any], kwargs: Union[dict, Any],
@ -236,6 +262,153 @@ class DataDogLogger(CustomBatchLogger):
""" """
import json import json
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if standard_logging_object is None:
raise ValueError("standard_logging_object not found in kwargs")
status = DataDogStatus.INFO
if standard_logging_object.get("status") == "failure":
status = DataDogStatus.ERROR
# Build the initial payload
make_json_serializable(standard_logging_object)
json_payload = json.dumps(standard_logging_object)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
dd_payload = DatadogPayload(
ddsource=os.getenv("DD_SOURCE", "litellm"),
ddtags="",
hostname="",
message=json_payload,
service="litellm-server",
status=status,
)
return dd_payload
async def async_send_compressed_data(self, data: List) -> Response:
"""
Async helper to send compressed data to datadog self.intake_url
Datadog recommends using gzip to compress data
https://docs.datadoghq.com/api/latest/logs/
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending"
"""
import gzip
import json
compressed_data = gzip.compress(json.dumps(data).encode("utf-8"))
response = await self.async_client.post(
url=self.intake_url,
data=compressed_data, # type: ignore
headers={
"DD-API-KEY": self.DD_API_KEY,
"Content-Encoding": "gzip",
"Content-Type": "application/json",
},
)
return response
async def async_service_failure_hook(
self,
payload: ServiceLoggerPayload,
error: Optional[str] = "",
parent_otel_span: Optional[Any] = None,
start_time: Optional[Union[datetimeObj, float]] = None,
end_time: Optional[Union[float, datetimeObj]] = None,
event_metadata: Optional[dict] = None,
):
"""
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
- example - Redis is failing / erroring, will be logged on DataDog
"""
try:
import json
_payload_dict = payload.model_dump()
_dd_message_str = json.dumps(_payload_dict)
_dd_payload = DatadogPayload(
ddsource="litellm",
ddtags="",
hostname="",
message=_dd_message_str,
service="litellm-server",
status=DataDogStatus.WARN,
)
self.log_queue.append(_dd_payload)
except Exception as e:
verbose_logger.exception(
f"Datadog: Logger - Exception in async_service_failure_hook: {e}"
)
pass
async def async_post_call_failure_hook(
self,
request_data: dict,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
):
import json
_exception_payload = DatadogProxyFailureHookJsonMessage(
exception=str(original_exception),
error_class=str(original_exception.__class__.__name__),
status_code=getattr(original_exception, "status_code", None),
traceback=traceback.format_exc(),
user_api_key_dict=user_api_key_dict.model_dump(),
)
json_payload = json.dumps(_exception_payload)
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
dd_payload = DatadogPayload(
ddsource=os.getenv("DD_SOURCE", "litellm"),
ddtags="",
hostname="",
message=json_payload,
service="litellm-server",
status=DataDogStatus.ERROR,
)
self.log_queue.append(dd_payload)
async def async_service_success_hook(
self,
payload: ServiceLoggerPayload,
error: Optional[str] = "",
parent_otel_span: Optional[Any] = None,
start_time: Optional[Union[datetimeObj, float]] = None,
end_time: Optional[Union[float, datetimeObj]] = None,
event_metadata: Optional[dict] = None,
):
"""
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
"""
return
def _create_v0_logging_payload(
self,
kwargs: Union[dict, Any],
response_obj: Any,
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> DatadogPayload:
"""
Note: This is our V1 Version of DataDog Logging Payload
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True`
"""
import json
litellm_params = kwargs.get("litellm_params", {}) litellm_params = kwargs.get("litellm_params", {})
metadata = ( metadata = (
litellm_params.get("metadata", {}) or {} litellm_params.get("metadata", {}) or {}
@ -305,80 +478,3 @@ class DataDogLogger(CustomBatchLogger):
status=DataDogStatus.INFO, status=DataDogStatus.INFO,
) )
return dd_payload return dd_payload
async def async_send_compressed_data(self, data: List) -> Response:
"""
Async helper to send compressed data to datadog self.intake_url
Datadog recommends using gzip to compress data
https://docs.datadoghq.com/api/latest/logs/
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending"
"""
import gzip
import json
compressed_data = gzip.compress(json.dumps(data).encode("utf-8"))
response = await self.async_client.post(
url=self.intake_url,
data=compressed_data, # type: ignore
headers={
"DD-API-KEY": self.DD_API_KEY,
"Content-Encoding": "gzip",
"Content-Type": "application/json",
},
)
return response
async def async_service_failure_hook(
self,
payload: ServiceLoggerPayload,
error: Optional[str] = "",
parent_otel_span: Optional[Any] = None,
start_time: Optional[Union[datetimeObj, float]] = None,
end_time: Optional[Union[float, datetimeObj]] = None,
event_metadata: Optional[dict] = None,
):
"""
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
- example - Redis is failing / erroring, will be logged on DataDog
"""
try:
import json
_payload_dict = payload.model_dump()
_dd_message_str = json.dumps(_payload_dict)
_dd_payload = DatadogPayload(
ddsource="litellm",
ddtags="",
hostname="",
message=_dd_message_str,
service="litellm-server",
status=DataDogStatus.WARN,
)
self.log_queue.append(_dd_payload)
except Exception as e:
verbose_logger.exception(
f"Datadog: Logger - Exception in async_service_failure_hook: {e}"
)
pass
async def async_service_success_hook(
self,
payload: ServiceLoggerPayload,
error: Optional[str] = "",
parent_otel_span: Optional[Any] = None,
start_time: Optional[Union[datetimeObj, float]] = None,
end_time: Optional[Union[float, datetimeObj]] = None,
event_metadata: Optional[dict] = None,
):
"""
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
"""
return

View file

@ -1197,12 +1197,14 @@ async def user_api_key_auth( # noqa: PLR0915
extra={"requester_ip": requester_ip}, extra={"requester_ip": requester_ip},
) )
# Log this exception to OTEL # Log this exception to OTEL, Datadog, All Custom Loggers
if open_telemetry_logger is not None: asyncio.create_task(
await open_telemetry_logger.async_post_call_failure_hook( # type: ignore proxy_logging_obj.async_log_proxy_authentication_errors(
original_exception=e, original_exception=e,
request_data={}, request=request,
user_api_key_dict=UserAPIKeyAuth(parent_otel_span=parent_otel_span), parent_otel_span=parent_otel_span,
api_key=api_key,
)
) )
if isinstance(e, litellm.BudgetExceededError): if isinstance(e, litellm.BudgetExceededError):

View file

@ -8,18 +8,7 @@ model_list:
model: anthropic/fake model: anthropic/fake
api_base: https://exampleanthropicendpoint-production.up.railway.app/ api_base: https://exampleanthropicendpoint-production.up.railway.app/
router_settings:
provider_budget_config:
openai:
budget_limit: 0.3 # float of $ value budget for time period
time_period: 1d # can be 1d, 2d, 30d
anthropic:
budget_limit: 5
time_period: 1d
redis_host: os.environ/REDIS_HOST
redis_port: os.environ/REDIS_PORT
redis_password: os.environ/REDIS_PASSWORD
litellm_settings: litellm_settings:
callbacks: ["prometheus"] callbacks: ["datadog"] # will log success & failures
success_callback: ["langfuse"] service_callbacks: ["datadog"] # will log DB fails / exceptions
turn_off_message_logging: True # will redact message / response content

View file

@ -854,6 +854,20 @@ class ProxyLogging:
), ),
).start() ).start()
await self._run_post_call_failure_hook_custom_loggers(
original_exception=original_exception,
request_data=request_data,
user_api_key_dict=user_api_key_dict,
)
return
async def _run_post_call_failure_hook_custom_loggers(
self,
original_exception: Exception,
request_data: dict,
user_api_key_dict: UserAPIKeyAuth,
):
for callback in litellm.callbacks: for callback in litellm.callbacks:
try: try:
_callback: Optional[CustomLogger] = None _callback: Optional[CustomLogger] = None
@ -872,7 +886,35 @@ class ProxyLogging:
except Exception as e: except Exception as e:
raise e raise e
return async def async_log_proxy_authentication_errors(
self,
original_exception: Exception,
request: Request,
parent_otel_span: Optional[Any],
api_key: str,
):
"""
Handler for Logging Authentication Errors on LiteLLM Proxy
Why not use post_call_failure_hook?
- `post_call_failure_hook` calls `litellm_logging_obj.async_failure_handler`. This led to the Exception being logged twice
What does this handler do?
- Logs Authentication Errors (like invalid API Key passed) to CustomLogger compatible classes
- calls CustomLogger.async_post_call_failure_hook
"""
user_api_key_dict = UserAPIKeyAuth(
parent_otel_span=parent_otel_span,
token=_hash_token_if_needed(token=api_key),
)
request_data = await request.json()
await self._run_post_call_failure_hook_custom_loggers(
original_exception=original_exception,
request_data=request_data,
user_api_key_dict=user_api_key_dict,
)
pass
async def post_call_success_hook( async def post_call_success_hook(
self, self,

View file

@ -1,5 +1,5 @@
from enum import Enum from enum import Enum
from typing import TypedDict from typing import Optional, TypedDict
class DataDogStatus(str, Enum): class DataDogStatus(str, Enum):
@ -19,3 +19,11 @@ class DatadogPayload(TypedDict, total=False):
class DD_ERRORS(Enum): class DD_ERRORS(Enum):
DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`" DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`"
class DatadogProxyFailureHookJsonMessage(TypedDict, total=False):
exception: str
error_class: str
status_code: Optional[int]
traceback: str
user_api_key_dict: dict

View file

@ -0,0 +1,138 @@
import io
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import gzip
import json
import logging
import time
from unittest.mock import AsyncMock, patch
import pytest
import litellm
from litellm import completion
from litellm._logging import verbose_logger
from litellm.integrations.datadog.datadog import DataDogLogger, DataDogStatus
from datetime import datetime, timedelta
from litellm.types.integrations.datadog_llm_obs import *
from litellm.types.utils import (
StandardLoggingPayload,
StandardLoggingModelInformation,
StandardLoggingMetadata,
StandardLoggingHiddenParams,
)
verbose_logger.setLevel(logging.DEBUG)
def create_standard_logging_payload() -> StandardLoggingPayload:
return StandardLoggingPayload(
id="test_id",
call_type="completion",
response_cost=0.1,
response_cost_failure_debug_info=None,
status="success",
total_tokens=30,
prompt_tokens=20,
completion_tokens=10,
startTime=1234567890.0,
endTime=1234567891.0,
completionStartTime=1234567890.5,
model_map_information=StandardLoggingModelInformation(
model_map_key="gpt-3.5-turbo", model_map_value=None
),
model="gpt-3.5-turbo",
model_id="model-123",
model_group="openai-gpt",
api_base="https://api.openai.com",
metadata=StandardLoggingMetadata(
user_api_key_hash="test_hash",
user_api_key_org_id=None,
user_api_key_alias="test_alias",
user_api_key_team_id="test_team",
user_api_key_user_id="test_user",
user_api_key_team_alias="test_team_alias",
spend_logs_metadata=None,
requester_ip_address="127.0.0.1",
requester_metadata=None,
),
cache_hit=False,
cache_key=None,
saved_cache_cost=0.0,
request_tags=[],
end_user=None,
requester_ip_address="127.0.0.1",
messages=[{"role": "user", "content": "Hello, world!"}],
response={"choices": [{"message": {"content": "Hi there!"}}]},
error_str=None,
model_parameters={"stream": True},
hidden_params=StandardLoggingHiddenParams(
model_id="model-123",
cache_key=None,
api_base="https://api.openai.com",
response_cost="0.1",
additional_headers=None,
),
)
@pytest.mark.asyncio
async def test_create_datadog_logging_payload():
"""Test creating a DataDog logging payload from a standard logging object"""
dd_logger = DataDogLogger()
standard_payload = create_standard_logging_payload()
# Create mock kwargs with the standard logging object
kwargs = {"standard_logging_object": standard_payload}
# Test payload creation
dd_payload = dd_logger.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=None,
start_time=datetime.now(),
end_time=datetime.now(),
)
# Verify payload structure
assert dd_payload["ddsource"] == os.getenv("DD_SOURCE", "litellm")
assert dd_payload["service"] == "litellm-server"
assert dd_payload["status"] == DataDogStatus.INFO
# verify the message field == standard_payload
dict_payload = json.loads(dd_payload["message"])
assert dict_payload == standard_payload
@pytest.mark.asyncio
async def test_datadog_failure_logging():
"""Test logging a failure event to DataDog"""
dd_logger = DataDogLogger()
standard_payload = create_standard_logging_payload()
standard_payload["status"] = "failure" # Set status to failure
standard_payload["error_str"] = "Test error"
kwargs = {"standard_logging_object": standard_payload}
dd_payload = dd_logger.create_datadog_logging_payload(
kwargs=kwargs,
response_obj=None,
start_time=datetime.now(),
end_time=datetime.now(),
)
assert (
dd_payload["status"] == DataDogStatus.ERROR
) # Verify failure maps to warning status
# verify the message field == standard_payload
dict_payload = json.loads(dd_payload["message"])
assert dict_payload == standard_payload
# verify error_str is in the message field
assert "error_str" in dict_payload
assert dict_payload["error_str"] == "Test error"