forked from phoenix/litellm-mirror
Compare commits
10 commits
main
...
litellm_lo
Author | SHA1 | Date | |
---|---|---|---|
|
cbaaa1feea | ||
|
2803661ed8 | ||
|
90168de961 | ||
|
f916597303 | ||
|
f9a40e5db3 | ||
|
72562ddf2b | ||
|
48c4b272f7 | ||
|
d2b6054f0c | ||
|
6d0424881a | ||
|
7d6e45f78e |
7 changed files with 393 additions and 117 deletions
|
@ -68,6 +68,7 @@ callbacks: List[Union[Callable, _custom_logger_compatible_callbacks_literal]] =
|
||||||
langfuse_default_tags: Optional[List[str]] = None
|
langfuse_default_tags: Optional[List[str]] = None
|
||||||
langsmith_batch_size: Optional[int] = None
|
langsmith_batch_size: Optional[int] = None
|
||||||
argilla_batch_size: Optional[int] = None
|
argilla_batch_size: Optional[int] = None
|
||||||
|
datadog_use_v1: Optional[bool] = False # if you want to use v1 datadog logged payload
|
||||||
argilla_transformation_object: Optional[Dict[str, Any]] = None
|
argilla_transformation_object: Optional[Dict[str, Any]] = None
|
||||||
_async_input_callback: List[Callable] = (
|
_async_input_callback: List[Callable] = (
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -32,9 +32,11 @@ from litellm.llms.custom_httpx.http_handler import (
|
||||||
get_async_httpx_client,
|
get_async_httpx_client,
|
||||||
httpxSpecialProvider,
|
httpxSpecialProvider,
|
||||||
)
|
)
|
||||||
|
from litellm.proxy._types import UserAPIKeyAuth
|
||||||
|
from litellm.types.integrations.datadog import *
|
||||||
from litellm.types.services import ServiceLoggerPayload
|
from litellm.types.services import ServiceLoggerPayload
|
||||||
|
from litellm.types.utils import StandardLoggingPayload
|
||||||
|
|
||||||
from .types import DD_ERRORS, DatadogPayload, DataDogStatus
|
|
||||||
from .utils import make_json_serializable
|
from .utils import make_json_serializable
|
||||||
|
|
||||||
DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept
|
DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept
|
||||||
|
@ -106,20 +108,20 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Datadog: Logging - Enters logging function for model %s", kwargs
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
dd_payload = self.create_datadog_logging_payload(
|
await self._log_async_event(kwargs, response_obj, start_time, end_time)
|
||||||
kwargs=kwargs,
|
|
||||||
response_obj=response_obj,
|
|
||||||
start_time=start_time,
|
|
||||||
end_time=end_time,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log_queue.append(dd_payload)
|
except Exception as e:
|
||||||
|
verbose_logger.exception(
|
||||||
|
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}"
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
try:
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
|
await self._log_async_event(kwargs, response_obj, start_time, end_time)
|
||||||
if len(self.log_queue) >= self.batch_size:
|
|
||||||
await self.async_send_batch()
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
verbose_logger.exception(
|
verbose_logger.exception(
|
||||||
|
@ -181,12 +183,20 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
verbose_logger.debug(
|
verbose_logger.debug(
|
||||||
"Datadog: Logging - Enters logging function for model %s", kwargs
|
"Datadog: Logging - Enters logging function for model %s", kwargs
|
||||||
)
|
)
|
||||||
dd_payload = self.create_datadog_logging_payload(
|
if litellm.datadog_use_v1 is True:
|
||||||
kwargs=kwargs,
|
dd_payload = self._create_v0_logging_payload(
|
||||||
response_obj=response_obj,
|
kwargs=kwargs,
|
||||||
start_time=start_time,
|
response_obj=response_obj,
|
||||||
end_time=end_time,
|
start_time=start_time,
|
||||||
)
|
end_time=end_time,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
dd_payload = self.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=response_obj,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
)
|
||||||
|
|
||||||
response = self.sync_client.post(
|
response = self.sync_client.post(
|
||||||
url=self.intake_url,
|
url=self.intake_url,
|
||||||
|
@ -215,6 +225,22 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
pass
|
pass
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def _log_async_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
dd_payload = self.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=response_obj,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.log_queue.append(dd_payload)
|
||||||
|
verbose_logger.debug(
|
||||||
|
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..."
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(self.log_queue) >= self.batch_size:
|
||||||
|
await self.async_send_batch()
|
||||||
|
|
||||||
def create_datadog_logging_payload(
|
def create_datadog_logging_payload(
|
||||||
self,
|
self,
|
||||||
kwargs: Union[dict, Any],
|
kwargs: Union[dict, Any],
|
||||||
|
@ -236,6 +262,153 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get(
|
||||||
|
"standard_logging_object", None
|
||||||
|
)
|
||||||
|
if standard_logging_object is None:
|
||||||
|
raise ValueError("standard_logging_object not found in kwargs")
|
||||||
|
|
||||||
|
status = DataDogStatus.INFO
|
||||||
|
if standard_logging_object.get("status") == "failure":
|
||||||
|
status = DataDogStatus.ERROR
|
||||||
|
|
||||||
|
# Build the initial payload
|
||||||
|
make_json_serializable(standard_logging_object)
|
||||||
|
json_payload = json.dumps(standard_logging_object)
|
||||||
|
|
||||||
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
||||||
|
|
||||||
|
dd_payload = DatadogPayload(
|
||||||
|
ddsource=os.getenv("DD_SOURCE", "litellm"),
|
||||||
|
ddtags="",
|
||||||
|
hostname="",
|
||||||
|
message=json_payload,
|
||||||
|
service="litellm-server",
|
||||||
|
status=status,
|
||||||
|
)
|
||||||
|
return dd_payload
|
||||||
|
|
||||||
|
async def async_send_compressed_data(self, data: List) -> Response:
|
||||||
|
"""
|
||||||
|
Async helper to send compressed data to datadog self.intake_url
|
||||||
|
|
||||||
|
Datadog recommends using gzip to compress data
|
||||||
|
https://docs.datadoghq.com/api/latest/logs/
|
||||||
|
|
||||||
|
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending"
|
||||||
|
"""
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
|
||||||
|
compressed_data = gzip.compress(json.dumps(data).encode("utf-8"))
|
||||||
|
response = await self.async_client.post(
|
||||||
|
url=self.intake_url,
|
||||||
|
data=compressed_data, # type: ignore
|
||||||
|
headers={
|
||||||
|
"DD-API-KEY": self.DD_API_KEY,
|
||||||
|
"Content-Encoding": "gzip",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
async def async_service_failure_hook(
|
||||||
|
self,
|
||||||
|
payload: ServiceLoggerPayload,
|
||||||
|
error: Optional[str] = "",
|
||||||
|
parent_otel_span: Optional[Any] = None,
|
||||||
|
start_time: Optional[Union[datetimeObj, float]] = None,
|
||||||
|
end_time: Optional[Union[float, datetimeObj]] = None,
|
||||||
|
event_metadata: Optional[dict] = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
|
||||||
|
|
||||||
|
- example - Redis is failing / erroring, will be logged on DataDog
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
|
||||||
|
_payload_dict = payload.model_dump()
|
||||||
|
_dd_message_str = json.dumps(_payload_dict)
|
||||||
|
_dd_payload = DatadogPayload(
|
||||||
|
ddsource="litellm",
|
||||||
|
ddtags="",
|
||||||
|
hostname="",
|
||||||
|
message=_dd_message_str,
|
||||||
|
service="litellm-server",
|
||||||
|
status=DataDogStatus.WARN,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.log_queue.append(_dd_payload)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
verbose_logger.exception(
|
||||||
|
f"Datadog: Logger - Exception in async_service_failure_hook: {e}"
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def async_post_call_failure_hook(
|
||||||
|
self,
|
||||||
|
request_data: dict,
|
||||||
|
original_exception: Exception,
|
||||||
|
user_api_key_dict: UserAPIKeyAuth,
|
||||||
|
):
|
||||||
|
import json
|
||||||
|
|
||||||
|
_exception_payload = DatadogProxyFailureHookJsonMessage(
|
||||||
|
exception=str(original_exception),
|
||||||
|
error_class=str(original_exception.__class__.__name__),
|
||||||
|
status_code=getattr(original_exception, "status_code", None),
|
||||||
|
traceback=traceback.format_exc(),
|
||||||
|
user_api_key_dict=user_api_key_dict.model_dump(),
|
||||||
|
)
|
||||||
|
|
||||||
|
json_payload = json.dumps(_exception_payload)
|
||||||
|
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload)
|
||||||
|
dd_payload = DatadogPayload(
|
||||||
|
ddsource=os.getenv("DD_SOURCE", "litellm"),
|
||||||
|
ddtags="",
|
||||||
|
hostname="",
|
||||||
|
message=json_payload,
|
||||||
|
service="litellm-server",
|
||||||
|
status=DataDogStatus.ERROR,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.log_queue.append(dd_payload)
|
||||||
|
|
||||||
|
async def async_service_success_hook(
|
||||||
|
self,
|
||||||
|
payload: ServiceLoggerPayload,
|
||||||
|
error: Optional[str] = "",
|
||||||
|
parent_otel_span: Optional[Any] = None,
|
||||||
|
start_time: Optional[Union[datetimeObj, float]] = None,
|
||||||
|
end_time: Optional[Union[float, datetimeObj]] = None,
|
||||||
|
event_metadata: Optional[dict] = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog
|
||||||
|
|
||||||
|
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
|
||||||
|
"""
|
||||||
|
return
|
||||||
|
|
||||||
|
def _create_v0_logging_payload(
|
||||||
|
self,
|
||||||
|
kwargs: Union[dict, Any],
|
||||||
|
response_obj: Any,
|
||||||
|
start_time: datetime.datetime,
|
||||||
|
end_time: datetime.datetime,
|
||||||
|
) -> DatadogPayload:
|
||||||
|
"""
|
||||||
|
Note: This is our V1 Version of DataDog Logging Payload
|
||||||
|
|
||||||
|
|
||||||
|
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True`
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
litellm_params = kwargs.get("litellm_params", {})
|
litellm_params = kwargs.get("litellm_params", {})
|
||||||
metadata = (
|
metadata = (
|
||||||
litellm_params.get("metadata", {}) or {}
|
litellm_params.get("metadata", {}) or {}
|
||||||
|
@ -305,80 +478,3 @@ class DataDogLogger(CustomBatchLogger):
|
||||||
status=DataDogStatus.INFO,
|
status=DataDogStatus.INFO,
|
||||||
)
|
)
|
||||||
return dd_payload
|
return dd_payload
|
||||||
|
|
||||||
async def async_send_compressed_data(self, data: List) -> Response:
|
|
||||||
"""
|
|
||||||
Async helper to send compressed data to datadog self.intake_url
|
|
||||||
|
|
||||||
Datadog recommends using gzip to compress data
|
|
||||||
https://docs.datadoghq.com/api/latest/logs/
|
|
||||||
|
|
||||||
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending"
|
|
||||||
"""
|
|
||||||
import gzip
|
|
||||||
import json
|
|
||||||
|
|
||||||
compressed_data = gzip.compress(json.dumps(data).encode("utf-8"))
|
|
||||||
response = await self.async_client.post(
|
|
||||||
url=self.intake_url,
|
|
||||||
data=compressed_data, # type: ignore
|
|
||||||
headers={
|
|
||||||
"DD-API-KEY": self.DD_API_KEY,
|
|
||||||
"Content-Encoding": "gzip",
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return response
|
|
||||||
|
|
||||||
async def async_service_failure_hook(
|
|
||||||
self,
|
|
||||||
payload: ServiceLoggerPayload,
|
|
||||||
error: Optional[str] = "",
|
|
||||||
parent_otel_span: Optional[Any] = None,
|
|
||||||
start_time: Optional[Union[datetimeObj, float]] = None,
|
|
||||||
end_time: Optional[Union[float, datetimeObj]] = None,
|
|
||||||
event_metadata: Optional[dict] = None,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog
|
|
||||||
|
|
||||||
- example - Redis is failing / erroring, will be logged on DataDog
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
import json
|
|
||||||
|
|
||||||
_payload_dict = payload.model_dump()
|
|
||||||
_dd_message_str = json.dumps(_payload_dict)
|
|
||||||
_dd_payload = DatadogPayload(
|
|
||||||
ddsource="litellm",
|
|
||||||
ddtags="",
|
|
||||||
hostname="",
|
|
||||||
message=_dd_message_str,
|
|
||||||
service="litellm-server",
|
|
||||||
status=DataDogStatus.WARN,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log_queue.append(_dd_payload)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
verbose_logger.exception(
|
|
||||||
f"Datadog: Logger - Exception in async_service_failure_hook: {e}"
|
|
||||||
)
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def async_service_success_hook(
|
|
||||||
self,
|
|
||||||
payload: ServiceLoggerPayload,
|
|
||||||
error: Optional[str] = "",
|
|
||||||
parent_otel_span: Optional[Any] = None,
|
|
||||||
start_time: Optional[Union[datetimeObj, float]] = None,
|
|
||||||
end_time: Optional[Union[float, datetimeObj]] = None,
|
|
||||||
event_metadata: Optional[dict] = None,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog
|
|
||||||
|
|
||||||
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this
|
|
||||||
"""
|
|
||||||
return
|
|
||||||
|
|
|
@ -1197,13 +1197,15 @@ async def user_api_key_auth( # noqa: PLR0915
|
||||||
extra={"requester_ip": requester_ip},
|
extra={"requester_ip": requester_ip},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Log this exception to OTEL
|
# Log this exception to OTEL, Datadog, All Custom Loggers
|
||||||
if open_telemetry_logger is not None:
|
asyncio.create_task(
|
||||||
await open_telemetry_logger.async_post_call_failure_hook( # type: ignore
|
proxy_logging_obj.async_log_proxy_authentication_errors(
|
||||||
original_exception=e,
|
original_exception=e,
|
||||||
request_data={},
|
request=request,
|
||||||
user_api_key_dict=UserAPIKeyAuth(parent_otel_span=parent_otel_span),
|
parent_otel_span=parent_otel_span,
|
||||||
|
api_key=api_key,
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if isinstance(e, litellm.BudgetExceededError):
|
if isinstance(e, litellm.BudgetExceededError):
|
||||||
raise ProxyException(
|
raise ProxyException(
|
||||||
|
|
|
@ -8,18 +8,7 @@ model_list:
|
||||||
model: anthropic/fake
|
model: anthropic/fake
|
||||||
api_base: https://exampleanthropicendpoint-production.up.railway.app/
|
api_base: https://exampleanthropicendpoint-production.up.railway.app/
|
||||||
|
|
||||||
router_settings:
|
|
||||||
provider_budget_config:
|
|
||||||
openai:
|
|
||||||
budget_limit: 0.3 # float of $ value budget for time period
|
|
||||||
time_period: 1d # can be 1d, 2d, 30d
|
|
||||||
anthropic:
|
|
||||||
budget_limit: 5
|
|
||||||
time_period: 1d
|
|
||||||
redis_host: os.environ/REDIS_HOST
|
|
||||||
redis_port: os.environ/REDIS_PORT
|
|
||||||
redis_password: os.environ/REDIS_PASSWORD
|
|
||||||
|
|
||||||
litellm_settings:
|
litellm_settings:
|
||||||
callbacks: ["prometheus"]
|
callbacks: ["datadog"] # will log success & failures
|
||||||
success_callback: ["langfuse"]
|
service_callbacks: ["datadog"] # will log DB fails / exceptions
|
||||||
|
turn_off_message_logging: True # will redact message / response content
|
|
@ -854,6 +854,20 @@ class ProxyLogging:
|
||||||
),
|
),
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
|
await self._run_post_call_failure_hook_custom_loggers(
|
||||||
|
original_exception=original_exception,
|
||||||
|
request_data=request_data,
|
||||||
|
user_api_key_dict=user_api_key_dict,
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
async def _run_post_call_failure_hook_custom_loggers(
|
||||||
|
self,
|
||||||
|
original_exception: Exception,
|
||||||
|
request_data: dict,
|
||||||
|
user_api_key_dict: UserAPIKeyAuth,
|
||||||
|
):
|
||||||
for callback in litellm.callbacks:
|
for callback in litellm.callbacks:
|
||||||
try:
|
try:
|
||||||
_callback: Optional[CustomLogger] = None
|
_callback: Optional[CustomLogger] = None
|
||||||
|
@ -872,7 +886,35 @@ class ProxyLogging:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
return
|
async def async_log_proxy_authentication_errors(
|
||||||
|
self,
|
||||||
|
original_exception: Exception,
|
||||||
|
request: Request,
|
||||||
|
parent_otel_span: Optional[Any],
|
||||||
|
api_key: str,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Handler for Logging Authentication Errors on LiteLLM Proxy
|
||||||
|
|
||||||
|
Why not use post_call_failure_hook?
|
||||||
|
- `post_call_failure_hook` calls `litellm_logging_obj.async_failure_handler`. This led to the Exception being logged twice
|
||||||
|
|
||||||
|
What does this handler do?
|
||||||
|
- Logs Authentication Errors (like invalid API Key passed) to CustomLogger compatible classes
|
||||||
|
- calls CustomLogger.async_post_call_failure_hook
|
||||||
|
"""
|
||||||
|
|
||||||
|
user_api_key_dict = UserAPIKeyAuth(
|
||||||
|
parent_otel_span=parent_otel_span,
|
||||||
|
token=_hash_token_if_needed(token=api_key),
|
||||||
|
)
|
||||||
|
request_data = await request.json()
|
||||||
|
await self._run_post_call_failure_hook_custom_loggers(
|
||||||
|
original_exception=original_exception,
|
||||||
|
request_data=request_data,
|
||||||
|
user_api_key_dict=user_api_key_dict,
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
async def post_call_success_hook(
|
async def post_call_success_hook(
|
||||||
self,
|
self,
|
||||||
|
@ -986,7 +1028,7 @@ class ProxyLogging:
|
||||||
|
|
||||||
### DB CONNECTOR ###
|
### DB CONNECTOR ###
|
||||||
# Define the retry decorator with backoff strategy
|
# Define the retry decorator with backoff strategy
|
||||||
# Function to be called whenever a retry is about to happen
|
# Function to be called whenever a retry is about to happen
|
||||||
def on_backoff(details):
|
def on_backoff(details):
|
||||||
# The 'tries' key in the details dictionary contains the number of completed tries
|
# The 'tries' key in the details dictionary contains the number of completed tries
|
||||||
print_verbose(f"Backing off... this was attempt #{details['tries']}")
|
print_verbose(f"Backing off... this was attempt #{details['tries']}")
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import TypedDict
|
from typing import Optional, TypedDict
|
||||||
|
|
||||||
|
|
||||||
class DataDogStatus(str, Enum):
|
class DataDogStatus(str, Enum):
|
||||||
|
@ -19,3 +19,11 @@ class DatadogPayload(TypedDict, total=False):
|
||||||
|
|
||||||
class DD_ERRORS(Enum):
|
class DD_ERRORS(Enum):
|
||||||
DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`"
|
DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`"
|
||||||
|
|
||||||
|
|
||||||
|
class DatadogProxyFailureHookJsonMessage(TypedDict, total=False):
|
||||||
|
exception: str
|
||||||
|
error_class: str
|
||||||
|
status_code: Optional[int]
|
||||||
|
traceback: str
|
||||||
|
user_api_key_dict: dict
|
138
tests/logging_callback_tests/test_datadog.py
Normal file
138
tests/logging_callback_tests/test_datadog.py
Normal file
|
@ -0,0 +1,138 @@
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.abspath("../.."))
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import gzip
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import litellm
|
||||||
|
from litellm import completion
|
||||||
|
from litellm._logging import verbose_logger
|
||||||
|
from litellm.integrations.datadog.datadog import DataDogLogger, DataDogStatus
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from litellm.types.integrations.datadog_llm_obs import *
|
||||||
|
from litellm.types.utils import (
|
||||||
|
StandardLoggingPayload,
|
||||||
|
StandardLoggingModelInformation,
|
||||||
|
StandardLoggingMetadata,
|
||||||
|
StandardLoggingHiddenParams,
|
||||||
|
)
|
||||||
|
|
||||||
|
verbose_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def create_standard_logging_payload() -> StandardLoggingPayload:
|
||||||
|
return StandardLoggingPayload(
|
||||||
|
id="test_id",
|
||||||
|
call_type="completion",
|
||||||
|
response_cost=0.1,
|
||||||
|
response_cost_failure_debug_info=None,
|
||||||
|
status="success",
|
||||||
|
total_tokens=30,
|
||||||
|
prompt_tokens=20,
|
||||||
|
completion_tokens=10,
|
||||||
|
startTime=1234567890.0,
|
||||||
|
endTime=1234567891.0,
|
||||||
|
completionStartTime=1234567890.5,
|
||||||
|
model_map_information=StandardLoggingModelInformation(
|
||||||
|
model_map_key="gpt-3.5-turbo", model_map_value=None
|
||||||
|
),
|
||||||
|
model="gpt-3.5-turbo",
|
||||||
|
model_id="model-123",
|
||||||
|
model_group="openai-gpt",
|
||||||
|
api_base="https://api.openai.com",
|
||||||
|
metadata=StandardLoggingMetadata(
|
||||||
|
user_api_key_hash="test_hash",
|
||||||
|
user_api_key_org_id=None,
|
||||||
|
user_api_key_alias="test_alias",
|
||||||
|
user_api_key_team_id="test_team",
|
||||||
|
user_api_key_user_id="test_user",
|
||||||
|
user_api_key_team_alias="test_team_alias",
|
||||||
|
spend_logs_metadata=None,
|
||||||
|
requester_ip_address="127.0.0.1",
|
||||||
|
requester_metadata=None,
|
||||||
|
),
|
||||||
|
cache_hit=False,
|
||||||
|
cache_key=None,
|
||||||
|
saved_cache_cost=0.0,
|
||||||
|
request_tags=[],
|
||||||
|
end_user=None,
|
||||||
|
requester_ip_address="127.0.0.1",
|
||||||
|
messages=[{"role": "user", "content": "Hello, world!"}],
|
||||||
|
response={"choices": [{"message": {"content": "Hi there!"}}]},
|
||||||
|
error_str=None,
|
||||||
|
model_parameters={"stream": True},
|
||||||
|
hidden_params=StandardLoggingHiddenParams(
|
||||||
|
model_id="model-123",
|
||||||
|
cache_key=None,
|
||||||
|
api_base="https://api.openai.com",
|
||||||
|
response_cost="0.1",
|
||||||
|
additional_headers=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_create_datadog_logging_payload():
|
||||||
|
"""Test creating a DataDog logging payload from a standard logging object"""
|
||||||
|
dd_logger = DataDogLogger()
|
||||||
|
standard_payload = create_standard_logging_payload()
|
||||||
|
|
||||||
|
# Create mock kwargs with the standard logging object
|
||||||
|
kwargs = {"standard_logging_object": standard_payload}
|
||||||
|
|
||||||
|
# Test payload creation
|
||||||
|
dd_payload = dd_logger.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=None,
|
||||||
|
start_time=datetime.now(),
|
||||||
|
end_time=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify payload structure
|
||||||
|
assert dd_payload["ddsource"] == os.getenv("DD_SOURCE", "litellm")
|
||||||
|
assert dd_payload["service"] == "litellm-server"
|
||||||
|
assert dd_payload["status"] == DataDogStatus.INFO
|
||||||
|
|
||||||
|
# verify the message field == standard_payload
|
||||||
|
dict_payload = json.loads(dd_payload["message"])
|
||||||
|
assert dict_payload == standard_payload
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_datadog_failure_logging():
|
||||||
|
"""Test logging a failure event to DataDog"""
|
||||||
|
dd_logger = DataDogLogger()
|
||||||
|
standard_payload = create_standard_logging_payload()
|
||||||
|
standard_payload["status"] = "failure" # Set status to failure
|
||||||
|
standard_payload["error_str"] = "Test error"
|
||||||
|
|
||||||
|
kwargs = {"standard_logging_object": standard_payload}
|
||||||
|
|
||||||
|
dd_payload = dd_logger.create_datadog_logging_payload(
|
||||||
|
kwargs=kwargs,
|
||||||
|
response_obj=None,
|
||||||
|
start_time=datetime.now(),
|
||||||
|
end_time=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
dd_payload["status"] == DataDogStatus.ERROR
|
||||||
|
) # Verify failure maps to warning status
|
||||||
|
|
||||||
|
# verify the message field == standard_payload
|
||||||
|
dict_payload = json.loads(dd_payload["message"])
|
||||||
|
assert dict_payload == standard_payload
|
||||||
|
|
||||||
|
# verify error_str is in the message field
|
||||||
|
assert "error_str" in dict_payload
|
||||||
|
assert dict_payload["error_str"] == "Test error"
|
Loading…
Add table
Add a link
Reference in a new issue