diff --git a/litellm/__init__.py b/litellm/__init__.py index 6afec1079..a0347d258 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -45,6 +45,7 @@ _custom_logger_compatible_callbacks_literal = Literal[ "dynamic_rate_limiter", "langsmith", "prometheus", + "datadog", "galileo", "braintrust", "arize", diff --git a/litellm/integrations/custom_batch_logger.py b/litellm/integrations/custom_batch_logger.py index 5cbfac683..23c63e951 100644 --- a/litellm/integrations/custom_batch_logger.py +++ b/litellm/integrations/custom_batch_logger.py @@ -17,14 +17,19 @@ DEFAULT_FLUSH_INTERVAL_SECONDS = 5 class CustomBatchLogger(CustomLogger): - def __init__(self, flush_lock: Optional[asyncio.Lock] = None, **kwargs) -> None: + def __init__( + self, + flush_lock: Optional[asyncio.Lock] = None, + batch_size: Optional[int] = DEFAULT_BATCH_SIZE, + **kwargs, + ) -> None: """ Args: flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching """ self.log_queue: List = [] self.flush_interval = DEFAULT_FLUSH_INTERVAL_SECONDS # 10 seconds - self.batch_size = DEFAULT_BATCH_SIZE + self.batch_size: int = batch_size or DEFAULT_BATCH_SIZE self.last_flush_time = time.time() self.flush_lock = flush_lock @@ -43,7 +48,7 @@ class CustomBatchLogger(CustomLogger): async with self.flush_lock: if self.log_queue: verbose_logger.debug( - "CustomLogger: Flushing batch of %s events", self.batch_size + "CustomLogger: Flushing batch of %s events", len(self.log_queue) ) await self.async_send_batch() self.log_queue.clear() diff --git a/litellm/integrations/datadog.py b/litellm/integrations/datadog.py deleted file mode 100644 index 98edf4c11..000000000 --- a/litellm/integrations/datadog.py +++ /dev/null @@ -1,155 +0,0 @@ -#### What this does #### -# On success + failure, log events to Datadog - -import datetime -import os -import subprocess -import sys -import traceback -import uuid - -import dotenv -import requests # type: ignore - -import litellm -from litellm._logging import print_verbose, verbose_logger - - -def make_json_serializable(payload): - for key, value in payload.items(): - try: - if isinstance(value, dict): - # recursively sanitize dicts - payload[key] = make_json_serializable(value.copy()) - elif not isinstance(value, (str, int, float, bool, type(None))): - # everything else becomes a string - payload[key] = str(value) - except: - # non blocking if it can't cast to a str - pass - return payload - - -class DataDogLogger: - # Class variables or attributes - def __init__( - self, - **kwargs, - ): - from datadog_api_client import ApiClient, Configuration - - # check if the correct env variables are set - if os.getenv("DD_API_KEY", None) is None: - raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>") - if os.getenv("DD_SITE", None) is None: - raise Exception("DD_SITE is not set in .env, set 'DD_SITE=<>") - self.configuration = Configuration() - - try: - verbose_logger.debug(f"in init datadog logger") - pass - - except Exception as e: - print_verbose(f"Got exception on init s3 client {str(e)}") - raise e - - async def _async_log_event( - self, kwargs, response_obj, start_time, end_time, print_verbose, user_id - ): - self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) - - def log_event( - self, kwargs, response_obj, start_time, end_time, user_id, print_verbose - ): - try: - # Define DataDog client - from datadog_api_client.v2 import ApiClient - from datadog_api_client.v2.api.logs_api import LogsApi - from datadog_api_client.v2.models import HTTPLog, HTTPLogItem - - verbose_logger.debug( - f"datadog Logging - Enters logging function for model {kwargs}" - ) - litellm_params = kwargs.get("litellm_params", {}) - metadata = ( - litellm_params.get("metadata", {}) or {} - ) # if litellm_params['metadata'] == None - messages = kwargs.get("messages") - optional_params = kwargs.get("optional_params", {}) - call_type = kwargs.get("call_type", "litellm.completion") - cache_hit = kwargs.get("cache_hit", False) - usage = response_obj["usage"] - id = response_obj.get("id", str(uuid.uuid4())) - usage = dict(usage) - try: - response_time = (end_time - start_time).total_seconds() * 1000 - except: - response_time = None - - try: - response_obj = dict(response_obj) - except: - response_obj = response_obj - - # Clean Metadata before logging - never log raw metadata - # the raw metadata can contain circular references which leads to infinite recursion - # we clean out all extra litellm metadata params before logging - clean_metadata = {} - if isinstance(metadata, dict): - for key, value in metadata.items(): - # clean litellm metadata before logging - if key in [ - "endpoint", - "caching_groups", - "previous_models", - ]: - continue - else: - clean_metadata[key] = value - - # Build the initial payload - payload = { - "id": id, - "call_type": call_type, - "cache_hit": cache_hit, - "start_time": start_time, - "end_time": end_time, - "response_time": response_time, - "model": kwargs.get("model", ""), - "user": kwargs.get("user", ""), - "model_parameters": optional_params, - "spend": kwargs.get("response_cost", 0), - "messages": messages, - "response": response_obj, - "usage": usage, - "metadata": clean_metadata, - } - - make_json_serializable(payload) - import json - - payload = json.dumps(payload) - - print_verbose(f"\ndd Logger - Logging payload = {payload}") - - with ApiClient(self.configuration) as api_client: - api_instance = LogsApi(api_client) - body = HTTPLog( - [ - HTTPLogItem( - ddsource=os.getenv("DD_SOURCE", "litellm"), - message=payload, - service="litellm-server", - ), - ] - ) - response = api_instance.submit_log(body) - - print_verbose( - f"Datadog Layer Logging - final response object: {response_obj}" - ) - except Exception as e: - verbose_logger.exception( - f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" - ) - pass diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py new file mode 100644 index 000000000..d711604de --- /dev/null +++ b/litellm/integrations/datadog/datadog.py @@ -0,0 +1,322 @@ +""" +DataDog Integration - sends logs to /api/v2/log + +DD Reference API: https://docs.datadoghq.com/api/latest/logs + +`async_log_success_event` - used by litellm proxy to send logs to datadog +`log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions + +async_log_success_event will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds + +For batching specific details see CustomBatchLogger class +""" + +import asyncio +import datetime +import os +import sys +import traceback +import uuid +from typing import Any, Dict, List, Optional, Union + +from httpx import Response + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + _get_httpx_client, + get_async_httpx_client, + httpxSpecialProvider, +) + +from .types import DD_ERRORS, DatadogPayload +from .utils import make_json_serializable + +DD_MAX_BATCH_SIZE = 1000 # max number of logs DD API can accept + + +class DataDogLogger(CustomBatchLogger): + # Class variables or attributes + def __init__( + self, + **kwargs, + ): + """ + Initializes the datadog logger, checks if the correct env variables are set + + Required environment variables: + `DD_API_KEY` - your datadog api key + `DD_SITE` - your datadog site, example = `"us5.datadoghq.com"` + """ + try: + verbose_logger.debug(f"Datadog: in init datadog logger") + # check if the correct env variables are set + if os.getenv("DD_API_KEY", None) is None: + raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>") + if os.getenv("DD_SITE", None) is None: + raise Exception("DD_SITE is not set in .env, set 'DD_SITE=<>") + self.async_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.DD_API_KEY = os.getenv("DD_API_KEY") + self.intake_url = ( + f"https://http-intake.logs.{os.getenv('DD_SITE')}/api/v2/logs" + ) + + ################################### + # OPTIONAL -only used for testing + if os.getenv("_DATADOG_BASE_URL", None) is not None: + _dd_base_url = os.getenv("_DATADOG_BASE_URL") + self.intake_url = f"{_dd_base_url}/api/v2/logs" + ################################### + self.sync_client = _get_httpx_client() + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + super().__init__( + **kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE + ) + except Exception as e: + verbose_logger.exception( + f"Datadog: Got exception on init Datadog client {str(e)}" + ) + raise e + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Async Log success events to Datadog + + - Creates a Datadog payload + - Adds the Payload to the in memory logs queue + - Payload is flushed every 10 seconds or when batch size is greater than 100 + + + Raises: + Raises a NON Blocking verbose_logger.exception if an error occurs + """ + try: + verbose_logger.debug( + "Datadog: Logging - Enters logging function for model %s", kwargs + ) + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + self.log_queue.append(dd_payload) + verbose_logger.debug( + f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_send_batch(self): + """ + Sends the in memory logs queue to datadog api + + Logs sent to /api/v2/logs + + DD Ref: https://docs.datadoghq.com/api/latest/logs/ + + Raises: + Raises a NON Blocking verbose_logger.exception if an error occurs + """ + try: + if not self.log_queue: + verbose_logger.exception("Datadog: log_queue does not exist") + return + + verbose_logger.debug( + "Datadog - about to flush %s events on %s", + len(self.log_queue), + self.intake_url, + ) + + response = await self.async_send_compressed_data(self.log_queue) + if response.status_code == 413: + verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value) + return + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"Response from datadog API status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) + except Exception as e: + verbose_logger.exception( + f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Sync Log success events to Datadog + + - Creates a Datadog payload + - instantly logs it on DD API + """ + try: + verbose_logger.debug( + "Datadog: Logging - Enters logging function for model %s", kwargs + ) + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + response = self.sync_client.post( + url=self.intake_url, + json=dd_payload, + headers={ + "DD-API-KEY": self.DD_API_KEY, + }, + ) + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"Response from datadog API status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) + + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + pass + + def create_datadog_logging_payload( + self, + kwargs: Union[dict, Any], + response_obj: Any, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> DatadogPayload: + """ + Helper function to create a datadog payload for logging + + Args: + kwargs (Union[dict, Any]): request kwargs + response_obj (Any): llm api response + start_time (datetime.datetime): start time of request + end_time (datetime.datetime): end time of request + + Returns: + DatadogPayload: defined in types.py + """ + import json + + litellm_params = kwargs.get("litellm_params", {}) + metadata = ( + litellm_params.get("metadata", {}) or {} + ) # if litellm_params['metadata'] == None + messages = kwargs.get("messages") + optional_params = kwargs.get("optional_params", {}) + call_type = kwargs.get("call_type", "litellm.completion") + cache_hit = kwargs.get("cache_hit", False) + usage = response_obj["usage"] + id = response_obj.get("id", str(uuid.uuid4())) + usage = dict(usage) + try: + response_time = (end_time - start_time).total_seconds() * 1000 + except: + response_time = None + + try: + response_obj = dict(response_obj) + except: + response_obj = response_obj + + # Clean Metadata before logging - never log raw metadata + # the raw metadata can contain circular references which leads to infinite recursion + # we clean out all extra litellm metadata params before logging + clean_metadata = {} + if isinstance(metadata, dict): + for key, value in metadata.items(): + # clean litellm metadata before logging + if key in [ + "endpoint", + "caching_groups", + "previous_models", + ]: + continue + else: + clean_metadata[key] = value + + # Build the initial payload + payload = { + "id": id, + "call_type": call_type, + "cache_hit": cache_hit, + "start_time": start_time, + "end_time": end_time, + "response_time": response_time, + "model": kwargs.get("model", ""), + "user": kwargs.get("user", ""), + "model_parameters": optional_params, + "spend": kwargs.get("response_cost", 0), + "messages": messages, + "response": response_obj, + "usage": usage, + "metadata": clean_metadata, + } + + make_json_serializable(payload) + json_payload = json.dumps(payload) + + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + + dd_payload = DatadogPayload( + ddsource=os.getenv("DD_SOURCE", "litellm"), + ddtags="", + hostname="", + message=json_payload, + service="litellm-server", + ) + return dd_payload + + async def async_send_compressed_data(self, data: List) -> Response: + """ + Async helper to send compressed data to datadog self.intake_url + + Datadog recommends using gzip to compress data + https://docs.datadoghq.com/api/latest/logs/ + + "Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" + """ + import gzip + import json + + compressed_data = gzip.compress(json.dumps(data).encode("utf-8")) + response = await self.async_client.post( + url=self.intake_url, + data=compressed_data, + headers={ + "DD-API-KEY": self.DD_API_KEY, + "Content-Encoding": "gzip", + "Content-Type": "application/json", + }, + ) + return response diff --git a/litellm/integrations/datadog/types.py b/litellm/integrations/datadog/types.py new file mode 100644 index 000000000..c8ad4e47a --- /dev/null +++ b/litellm/integrations/datadog/types.py @@ -0,0 +1,14 @@ +from enum import Enum +from typing import TypedDict + + +class DatadogPayload(TypedDict, total=False): + ddsource: str + ddtags: str + hostname: str + message: str + service: str + + +class DD_ERRORS(Enum): + DATADOG_413_ERROR = "Datadog API Error - Payload too large (batch is above 5MB uncompressed). If you want this logged either disable request/response logging or set `DD_BATCH_SIZE=50`" diff --git a/litellm/integrations/datadog/utils.py b/litellm/integrations/datadog/utils.py new file mode 100644 index 000000000..b31c66ce7 --- /dev/null +++ b/litellm/integrations/datadog/utils.py @@ -0,0 +1,13 @@ +def make_json_serializable(payload): + for key, value in payload.items(): + try: + if isinstance(value, dict): + # recursively sanitize dicts + payload[key] = make_json_serializable(value.copy()) + elif not isinstance(value, (str, int, float, bool, type(None))): + # everything else becomes a string + payload[key] = str(value) + except: + # non blocking if it can't cast to a str + pass + return payload diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 9528b6fbb..1a624c5f8 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -69,7 +69,7 @@ from ..integrations.berrispend import BerriSpendLogger from ..integrations.braintrust_logging import BraintrustLogger from ..integrations.clickhouse import ClickhouseLogger from ..integrations.custom_logger import CustomLogger -from ..integrations.datadog import DataDogLogger +from ..integrations.datadog.datadog import DataDogLogger from ..integrations.dynamodb import DyanmoDBLogger from ..integrations.galileo import GalileoObserve from ..integrations.gcs_bucket import GCSBucketLogger @@ -962,33 +962,6 @@ class Logging: service_name="langfuse", trace_id=_trace_id, ) - if callback == "datadog": - global dataDogLogger - verbose_logger.debug("reaches datadog for success logging!") - kwargs = {} - for k, v in self.model_call_details.items(): - if ( - k != "original_response" - ): # copy.deepcopy raises errors as this could be a coroutine - kwargs[k] = v - # this only logs streaming once, complete_streaming_response exists i.e when stream ends - if self.stream: - verbose_logger.debug( - f"datadog: is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" - ) - if complete_streaming_response is None: - continue - else: - print_verbose("reaches datadog for streaming logging!") - result = kwargs["complete_streaming_response"] - dataDogLogger.log_event( - kwargs=kwargs, - response_obj=result, - start_time=start_time, - end_time=end_time, - user_id=kwargs.get("user", None), - print_verbose=print_verbose, - ) if callback == "generic": global genericAPILogger verbose_logger.debug("reaches langfuse for success logging!") @@ -2125,6 +2098,14 @@ def _init_custom_logger_compatible_class( _prometheus_logger = PrometheusLogger() _in_memory_loggers.append(_prometheus_logger) return _prometheus_logger # type: ignore + elif logging_integration == "datadog": + for callback in _in_memory_loggers: + if isinstance(callback, DataDogLogger): + return callback # type: ignore + + _datadog_logger = DataDogLogger() + _in_memory_loggers.append(_datadog_logger) + return _datadog_logger # type: ignore elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): @@ -2251,6 +2232,10 @@ def get_custom_logger_compatible_class( for callback in _in_memory_loggers: if isinstance(callback, PrometheusLogger): return callback + elif logging_integration == "datadog": + for callback in _in_memory_loggers: + if isinstance(callback, DataDogLogger): + return callback elif logging_integration == "gcs_bucket": for callback in _in_memory_loggers: if isinstance(callback, GCSBucketLogger): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index c1148fc75..233507d16 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -1,7 +1,7 @@ model_list: - model_name: gemini-vision litellm_params: - model: vertex_ai/gemini-1.0-pro-vision-001 + model: vertex_ai/gemini-1.5-pro api_base: https://exampleopenaiendpoint-production.up.railway.app/v1/projects/adroit-crow-413218/locations/us-central1/publishers/google/models/gemini-1.0-pro-vision-001 vertex_project: "adroit-crow-413218" vertex_location: "us-central1" @@ -14,9 +14,7 @@ model_list: general_settings: master_key: sk-1234 - alerting: ["slack"] - alerting_threshold: 0.00001 litellm_settings: - callbacks: ["otel"] + success_callback: ["datadog"] diff --git a/litellm/tests/test_datadog.py b/litellm/tests/test_datadog.py index 789b39fd9..51c0e99d0 100644 --- a/litellm/tests/test_datadog.py +++ b/litellm/tests/test_datadog.py @@ -1,27 +1,164 @@ -import sys -import os import io +import os +import sys sys.path.insert(0, os.path.abspath("../..")) -from litellm import completion -import litellm +import asyncio +import gzip +import json +import logging +import time +from unittest.mock import AsyncMock, patch + import pytest -import time +import litellm +from litellm import completion +from litellm._logging import verbose_logger +from litellm.integrations.datadog.types import DatadogPayload + +verbose_logger.setLevel(logging.DEBUG) -@pytest.mark.skip(reason="beta test - this is a new feature") -def test_datadog_logging(): +@pytest.mark.asyncio +async def test_datadog_logging_http_request(): + """ + - Test that the HTTP request is made to Datadog + - sent to the /api/v2/logs endpoint + - the payload is batched + - each element in the payload is a DatadogPayload + - each element in a DatadogPayload.message contains all the valid fields + """ + try: + from litellm.integrations.datadog.datadog import DataDogLogger + + os.environ["DD_SITE"] = "https://fake.datadoghq.com" + os.environ["DD_API_KEY"] = "anything" + dd_logger = DataDogLogger() + + litellm.callbacks = [dd_logger] + + litellm.set_verbose = True + + # Create a mock for the async_client's post method + mock_post = AsyncMock() + mock_post.return_value.status_code = 202 + mock_post.return_value.text = "Accepted" + dd_logger.async_client.post = mock_post + + # Make the completion call + for _ in range(5): + response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "what llm are u"}], + max_tokens=10, + temperature=0.2, + mock_response="Accepted", + ) + print(response) + + # Wait for 5 seconds + await asyncio.sleep(6) + + # Assert that the mock was called + assert mock_post.called, "HTTP request was not made" + + # Get the arguments of the last call + args, kwargs = mock_post.call_args + + print("CAll args and kwargs", args, kwargs) + + # Print the request body + + # You can add more specific assertions here if needed + # For example, checking if the URL is correct + assert kwargs["url"].endswith("/api/v2/logs"), "Incorrect DataDog endpoint" + + body = kwargs["data"] + + # use gzip to unzip the body + with gzip.open(io.BytesIO(body), "rb") as f: + body = f.read().decode("utf-8") + print(body) + + # body is string parse it to dict + body = json.loads(body) + print(body) + + assert len(body) == 5 # 5 logs should be sent to DataDog + + # Assert that the first element in body has the expected fields and shape + assert isinstance(body[0], dict), "First element in body should be a dictionary" + + # Get the expected fields and their types from DatadogPayload + expected_fields = DatadogPayload.__annotations__ + # Assert that all elements in body have the fields of DatadogPayload with correct types + for log in body: + assert isinstance(log, dict), "Each log should be a dictionary" + for field, expected_type in expected_fields.items(): + assert field in log, f"Field '{field}' is missing from the log" + assert isinstance( + log[field], expected_type + ), f"Field '{field}' has incorrect type. Expected {expected_type}, got {type(log[field])}" + + # Additional assertion to ensure no extra fields are present + for log in body: + assert set(log.keys()) == set( + expected_fields.keys() + ), f"Log contains unexpected fields: {set(log.keys()) - set(expected_fields.keys())}" + + # Parse the 'message' field as JSON and check its structure + message = json.loads(body[0]["message"]) + + expected_message_fields = [ + "id", + "call_type", + "cache_hit", + "start_time", + "end_time", + "response_time", + "model", + "user", + "model_parameters", + "spend", + "messages", + "response", + "usage", + "metadata", + ] + + for field in expected_message_fields: + assert field in message, f"Field '{field}' is missing from the message" + + # Check specific fields + assert message["call_type"] == "acompletion" + assert message["model"] == "gpt-3.5-turbo" + assert isinstance(message["model_parameters"], dict) + assert "temperature" in message["model_parameters"] + assert "max_tokens" in message["model_parameters"] + assert isinstance(message["response"], dict) + assert isinstance(message["usage"], dict) + assert isinstance(message["metadata"], dict) + + except Exception as e: + pytest.fail(f"Test failed with exception: {str(e)}") + + +@pytest.mark.asyncio +@pytest.mark.skip(reason="local-only test, to test if everything works fine.") +async def test_datadog_logging(): try: litellm.success_callback = ["datadog"] litellm.set_verbose = True - response = completion( + response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "what llm are u"}], max_tokens=10, temperature=0.2, ) print(response) + + await asyncio.sleep(5) except Exception as e: print(e) diff --git a/requirements.txt b/requirements.txt index 23b225236..659dcdc47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,6 @@ anthropic[vertex]==0.21.3 google-generativeai==0.5.0 # for vertex ai calls async_generator==1.10.0 # for async ollama calls langfuse==2.45.0 # for langfuse self-hosted logging -datadog-api-client==2.23.0 # for datadog logging prometheus_client==0.20.0 # for /metrics endpoint on proxy orjson==3.9.15 # fast /embedding responses apscheduler==3.10.4 # for resetting budget in background diff --git a/tests/load_tests/test_datadog_load_test.py b/tests/load_tests/test_datadog_load_test.py new file mode 100644 index 000000000..b56c82288 --- /dev/null +++ b/tests/load_tests/test_datadog_load_test.py @@ -0,0 +1,104 @@ +import sys +import os + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import litellm +import pytest +import logging +from litellm._logging import verbose_logger + + +def test_datadog_logging_async(): + try: + # litellm.set_verbose = True + os.environ["DD_API_KEY"] = "anything" + os.environ["_DATADOG_BASE_URL"] = ( + "https://exampleopenaiendpoint-production.up.railway.app" + ) + + os.environ["DD_SITE"] = "us5.datadoghq.com" + os.environ["DD_API_KEY"] = "xxxxxx" + + litellm.success_callback = ["datadog"] + + percentage_diffs = [] + + for run in range(1): + print(f"\nRun {run + 1}:") + + # Test with empty success_callback + litellm.success_callback = [] + litellm.callbacks = [] + start_time_empty_callback = asyncio.run(make_async_calls()) + print("Done with no callback test") + + # Test with datadog callback + print("Starting datadog test") + litellm.success_callback = ["datadog"] + start_time_datadog = asyncio.run(make_async_calls()) + print("Done with datadog test") + + # Compare times and calculate percentage difference + print(f"Time with success_callback='datadog': {start_time_datadog}") + print(f"Time with empty success_callback: {start_time_empty_callback}") + + percentage_diff = ( + abs(start_time_datadog - start_time_empty_callback) + / start_time_empty_callback + * 100 + ) + percentage_diffs.append(percentage_diff) + print(f"Performance difference: {percentage_diff:.2f}%") + + print("percentage_diffs", percentage_diffs) + avg_percentage_diff = sum(percentage_diffs) / len(percentage_diffs) + print(f"\nAverage performance difference: {avg_percentage_diff:.2f}%") + + assert ( + avg_percentage_diff < 10 + ), f"Average performance difference of {avg_percentage_diff:.2f}% exceeds 10% threshold" + + except litellm.Timeout: + pass + except Exception as e: + pytest.fail(f"An exception occurred - {e}") + + +async def make_async_calls(metadata=None, **completion_kwargs): + total_tasks = 300 + batch_size = 100 + total_time = 0 + + for batch in range(1): + tasks = [create_async_task() for _ in range(batch_size)] + + start_time = asyncio.get_event_loop().time() + responses = await asyncio.gather(*tasks) + + for idx, response in enumerate(responses): + print(f"Response from Task {batch * batch_size + idx + 1}: {response}") + + await asyncio.sleep(7) + + batch_time = asyncio.get_event_loop().time() - start_time + total_time += batch_time + + return total_time + + +def create_async_task(**completion_kwargs): + litellm.set_verbose = True + completion_args = { + "model": "openai/chatgpt-v-2", + "api_version": "2024-02-01", + "messages": [{"role": "user", "content": "This is a test"}], + "max_tokens": 5, + "temperature": 0.7, + "timeout": 5, + "user": "datadog_latency_test_user", + "mock_response": "hello from my load test", + } + completion_args.update(completion_kwargs) + return asyncio.create_task(litellm.acompletion(**completion_args))