forked from phoenix/litellm-mirror
[Feat-Proxy-DataDog] Log Redis, Postgres Failure events on DataDog (#5750)
* dd - start tracking redis status on dd * add async_service_succes_hook / failure hook in custom logger * add async_service_failure_hook * log service failures on dd * fix import error * add test for redis errors / warning
This commit is contained in:
parent
7f4dfe434a
commit
911230c434
6 changed files with 216 additions and 18 deletions
|
@ -2,6 +2,7 @@ from datetime import datetime, timedelta
|
|||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_logger
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
|
||||
from .integrations.custom_logger import CustomLogger
|
||||
|
@ -77,11 +78,29 @@ class ServiceLogging(CustomLogger):
|
|||
await self.prometheusServicesLogger.async_service_success_hook(
|
||||
payload=payload
|
||||
)
|
||||
elif callback == "datadog":
|
||||
from litellm.integrations.datadog.datadog import DataDogLogger
|
||||
|
||||
await self.init_datadog_logger_if_none()
|
||||
await self.dd_logger.async_service_success_hook(
|
||||
payload=payload,
|
||||
parent_otel_span=parent_otel_span,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
event_metadata=event_metadata,
|
||||
)
|
||||
elif callback == "otel":
|
||||
from litellm.integrations.opentelemetry import OpenTelemetry
|
||||
from litellm.proxy.proxy_server import open_telemetry_logger
|
||||
|
||||
if parent_otel_span is not None and open_telemetry_logger is not None:
|
||||
await open_telemetry_logger.async_service_success_hook(
|
||||
await self.init_otel_logger_if_none()
|
||||
|
||||
if (
|
||||
parent_otel_span is not None
|
||||
and open_telemetry_logger is not None
|
||||
and isinstance(open_telemetry_logger, OpenTelemetry)
|
||||
):
|
||||
await self.otel_logger.async_service_success_hook(
|
||||
payload=payload,
|
||||
parent_otel_span=parent_otel_span,
|
||||
start_time=start_time,
|
||||
|
@ -100,6 +119,37 @@ class ServiceLogging(CustomLogger):
|
|||
self.prometheusServicesLogger = self.prometheusServicesLogger()
|
||||
return
|
||||
|
||||
async def init_datadog_logger_if_none(self):
|
||||
"""
|
||||
initializes dd_logger if it is None or no attribute exists on ServiceLogging Object
|
||||
|
||||
"""
|
||||
from litellm.integrations.datadog.datadog import DataDogLogger
|
||||
|
||||
if not hasattr(self, "dd_logger"):
|
||||
self.dd_logger: DataDogLogger = DataDogLogger()
|
||||
|
||||
return
|
||||
|
||||
async def init_otel_logger_if_none(self):
|
||||
"""
|
||||
initializes otel_logger if it is None or no attribute exists on ServiceLogging Object
|
||||
|
||||
"""
|
||||
from litellm.integrations.opentelemetry import OpenTelemetry
|
||||
from litellm.proxy.proxy_server import open_telemetry_logger
|
||||
|
||||
if not hasattr(self, "otel_logger"):
|
||||
if open_telemetry_logger is not None and isinstance(
|
||||
open_telemetry_logger, OpenTelemetry
|
||||
):
|
||||
self.otel_logger: OpenTelemetry = open_telemetry_logger
|
||||
else:
|
||||
verbose_logger.warning(
|
||||
"ServiceLogger: open_telemetry_logger is None or not an instance of OpenTelemetry"
|
||||
)
|
||||
return
|
||||
|
||||
async def async_service_failure_hook(
|
||||
self,
|
||||
service: ServiceTypes,
|
||||
|
@ -136,13 +186,23 @@ class ServiceLogging(CustomLogger):
|
|||
await self.prometheusServicesLogger.async_service_failure_hook(
|
||||
payload=payload
|
||||
)
|
||||
elif callback == "datadog":
|
||||
await self.init_datadog_logger_if_none()
|
||||
await self.dd_logger.async_service_failure_hook(
|
||||
payload=payload,
|
||||
error=error_message,
|
||||
parent_otel_span=parent_otel_span,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
event_metadata=event_metadata,
|
||||
)
|
||||
|
||||
from litellm.proxy.proxy_server import open_telemetry_logger
|
||||
|
||||
if not isinstance(error, str):
|
||||
error = str(error)
|
||||
if open_telemetry_logger is not None:
|
||||
await open_telemetry_logger.async_service_failure_hook(
|
||||
await self.otel_logger.async_service_failure_hook(
|
||||
payload=payload,
|
||||
parent_otel_span=parent_otel_span,
|
||||
start_time=start_time,
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
# On success, logs events to Promptlayer
|
||||
import os
|
||||
import traceback
|
||||
from datetime import datetime as datetimeObj
|
||||
from typing import Any, Literal, Optional, Tuple, Union
|
||||
|
||||
import dotenv
|
||||
|
@ -10,6 +11,7 @@ from pydantic import BaseModel
|
|||
from litellm.caching import DualCache
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.types.llms.openai import ChatCompletionRequest
|
||||
from litellm.types.services import ServiceLoggerPayload
|
||||
from litellm.types.utils import AdapterCompletionStreamWrapper, ModelResponse
|
||||
|
||||
|
||||
|
|
|
@ -6,7 +6,9 @@ DD Reference API: https://docs.datadoghq.com/api/latest/logs
|
|||
`async_log_success_event` - used by litellm proxy to send logs to datadog
|
||||
`log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions
|
||||
|
||||
async_log_success_event will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds
|
||||
async_log_success_event: will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds
|
||||
|
||||
async_service_failure_hook: Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
|
||||
|
||||
For batching specific details see CustomBatchLogger class
|
||||
"""
|
||||
|
@ -17,6 +19,7 @@ import os
|
|||
import sys
|
||||
import traceback
|
||||
import uuid
|
||||
from datetime import datetime as datetimeObj
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from httpx import Response
|
||||
|
@ -29,8 +32,9 @@ from litellm.llms.custom_httpx.http_handler import (
|
|||
get_async_httpx_client,
|
||||
httpxSpecialProvider,
|
||||
)
|
||||
from litellm.types.services import ServiceLoggerPayload
|
||||
|
||||
from .types import DD_ERRORS, DatadogPayload
|
||||
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
|
||||
from .utils import make_json_serializable
|
||||
|
||||
DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept
|
||||
|
@ -182,7 +186,7 @@ class DataDogLogger(CustomBatchLogger):
|
|||
|
||||
response = self.sync_client.post(
|
||||
url=self.intake_url,
|
||||
json=dd_payload,
|
||||
json=dd_payload, # type: ignore
|
||||
headers={
|
||||
"DD-API-KEY": self.DD_API_KEY,
|
||||
},
|
||||
|
@ -294,6 +298,7 @@ class DataDogLogger(CustomBatchLogger):
|
|||
hostname="",
|
||||
message=json_payload,
|
||||
service="litellm-server",
|
||||
status=DataDogStatus.INFO,
|
||||
)
|
||||
return dd_payload
|
||||
|
||||
|
@ -312,7 +317,7 @@ class DataDogLogger(CustomBatchLogger):
|
|||
compressed_data = gzip.compress(json.dumps(data).encode("utf-8"))
|
||||
response = await self.async_client.post(
|
||||
url=self.intake_url,
|
||||
data=compressed_data,
|
||||
data=compressed_data, # type: ignore
|
||||
headers={
|
||||
"DD-API-KEY": self.DD_API_KEY,
|
||||
"Content-Encoding": "gzip",
|
||||
|
@ -320,3 +325,56 @@ class DataDogLogger(CustomBatchLogger):
|
|||
},
|
||||
)
|
||||
return response
|
||||
|
||||
async def async_service_failure_hook(
|
||||
self,
|
||||
payload: ServiceLoggerPayload,
|
||||
error: Optional[str] = "",
|
||||
parent_otel_span: Optional[Any] = None,
|
||||
start_time: Optional[Union[datetimeObj, float]] = None,
|
||||
end_time: Optional[Union[float, datetimeObj]] = None,
|
||||
event_metadata: Optional[dict] = None,
|
||||
):
|
||||
"""
|
||||
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
|
||||
|
||||
- example - Redis is failing / erroring, will be logged on DataDog
|
||||
"""
|
||||
|
||||
try:
|
||||
import json
|
||||
|
||||
_payload_dict = payload.model_dump()
|
||||
_dd_message_str = json.dumps(_payload_dict)
|
||||
_dd_payload = DatadogPayload(
|
||||
ddsource="litellm",
|
||||
ddtags="",
|
||||
hostname="",
|
||||
message=_dd_message_str,
|
||||
service="litellm-server",
|
||||
status=DataDogStatus.WARN,
|
||||
)
|
||||
|
||||
self.log_queue.append(_dd_payload)
|
||||
|
||||
except Exception as e:
|
||||
verbose_logger.exception(
|
||||
f"Datadog: Logger - Exception in async_service_failure_hook: {e}"
|
||||
)
|
||||
pass
|
||||
|
||||
async def async_service_success_hook(
|
||||
self,
|
||||
payload: ServiceLoggerPayload,
|
||||
error: Optional[str] = "",
|
||||
parent_otel_span: Optional[Any] = None,
|
||||
start_time: Optional[Union[datetimeObj, float]] = None,
|
||||
end_time: Optional[Union[float, datetimeObj]] = None,
|
||||
event_metadata: Optional[dict] = None,
|
||||
):
|
||||
"""
|
||||
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog
|
||||
|
||||
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
|
||||
"""
|
||||
return
|
||||
|
|
|
@ -2,12 +2,19 @@ from enum import Enum
|
|||
from typing import TypedDict
|
||||
|
||||
|
||||
class DataDogStatus(str, Enum):
|
||||
INFO = "info"
|
||||
WARN = "warning"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
class DatadogPayload(TypedDict, total=False):
|
||||
ddsource: str
|
||||
ddtags: str
|
||||
hostname: str
|
||||
message: str
|
||||
service: str
|
||||
status: str
|
||||
|
||||
|
||||
class DD_ERRORS(Enum):
|
||||
|
|
|
@ -23,16 +23,5 @@ model_list:
|
|||
|
||||
general_settings:
|
||||
master_key: sk-1234
|
||||
alerting: ["slack"]
|
||||
alerting_threshold: 0.0001 # (Seconds) set an artifically low threshold for testing alerting
|
||||
alert_to_webhook_url: {
|
||||
"llm_too_slow": [
|
||||
"os.environ/SLACK_WEBHOOK_URL",
|
||||
"os.environ/SLACK_WEBHOOK_URL_2",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
litellm_settings:
|
||||
callbacks: ["gcs_bucket"]
|
||||
|
||||
|
|
|
@ -145,6 +145,88 @@ async def test_datadog_logging_http_request():
|
|||
pytest.fail(f"Test failed with exception: {str(e)}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_datadog_log_redis_failures():
|
||||
"""
|
||||
Test that poorly configured Redis is logged as Warning on DataDog
|
||||
"""
|
||||
try:
|
||||
from litellm.caching import Cache
|
||||
from litellm.integrations.datadog.datadog import DataDogLogger
|
||||
|
||||
litellm.cache = Cache(
|
||||
type="redis", host="badhost", port="6379", password="badpassword"
|
||||
)
|
||||
|
||||
os.environ["DD_SITE"] = "https://fake.datadoghq.com"
|
||||
os.environ["DD_API_KEY"] = "anything"
|
||||
dd_logger = DataDogLogger()
|
||||
|
||||
litellm.callbacks = [dd_logger]
|
||||
litellm.service_callback = ["datadog"]
|
||||
|
||||
litellm.set_verbose = True
|
||||
|
||||
# Create a mock for the async_client's post method
|
||||
mock_post = AsyncMock()
|
||||
mock_post.return_value.status_code = 202
|
||||
mock_post.return_value.text = "Accepted"
|
||||
dd_logger.async_client.post = mock_post
|
||||
|
||||
# Make the completion call
|
||||
for _ in range(3):
|
||||
response = await litellm.acompletion(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[{"role": "user", "content": "what llm are u"}],
|
||||
max_tokens=10,
|
||||
temperature=0.2,
|
||||
mock_response="Accepted",
|
||||
)
|
||||
print(response)
|
||||
|
||||
# Wait for 5 seconds
|
||||
await asyncio.sleep(6)
|
||||
|
||||
# Assert that the mock was called
|
||||
assert mock_post.called, "HTTP request was not made"
|
||||
|
||||
# Get the arguments of the last call
|
||||
args, kwargs = mock_post.call_args
|
||||
print("CAll args and kwargs", args, kwargs)
|
||||
|
||||
# For example, checking if the URL is correct
|
||||
assert kwargs["url"].endswith("/api/v2/logs"), "Incorrect DataDog endpoint"
|
||||
|
||||
body = kwargs["data"]
|
||||
|
||||
# use gzip to unzip the body
|
||||
with gzip.open(io.BytesIO(body), "rb") as f:
|
||||
body = f.read().decode("utf-8")
|
||||
print(body)
|
||||
|
||||
# body is string parse it to dict
|
||||
body = json.loads(body)
|
||||
print(body)
|
||||
|
||||
failure_events = [log for log in body if log["status"] == "warning"]
|
||||
assert len(failure_events) > 0, "No failure events logged"
|
||||
|
||||
print("ALL FAILURE/WARN EVENTS", failure_events)
|
||||
|
||||
for event in failure_events:
|
||||
message = json.loads(event["message"])
|
||||
assert (
|
||||
event["status"] == "warning"
|
||||
), f"Event status is not 'warning': {event['status']}"
|
||||
assert (
|
||||
message["service"] == "redis"
|
||||
), f"Service is not 'redis': {message['service']}"
|
||||
assert "error" in message, "No 'error' field in the message"
|
||||
assert message["error"], "Error field is empty"
|
||||
except Exception as e:
|
||||
pytest.fail(f"Test failed with exception: {str(e)}")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip(reason="local-only test, to test if everything works fine.")
|
||||
async def test_datadog_logging():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue