diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index 53d20c067..e657732db 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -15,6 +15,7 @@ import requests # type: ignore import litellm from litellm._logging import print_verbose, verbose_logger +from litellm.types.integrations.prometheus import LATENCY_BUCKETS from litellm.types.services import ServiceLoggerPayload, ServiceTypes @@ -96,6 +97,7 @@ class PrometheusServicesLogger: metric_name, "Latency for {} service".format(service), labelnames=[service], + buckets=LATENCY_BUCKETS, ) def create_counter(self, service: str, type_of_request: str): diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 18cc262b4..5bc044526 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -3,18 +3,8 @@ model_list: litellm_params: model: gpt-4o api_key: os.environ/OPENAI_API_KEY - tpm: 1000000 - rpm: 10000 - - -general_settings: - # master key is set via env var - # master_key: ####### - proxy_batch_write_at: 60 # Batch write spend updates every 60s + api_base: https://exampleopenaiendpoint-production.up.railway.app/ litellm_settings: - store_audit_logs: true - - # https://docs.litellm.ai/docs/proxy/reliability#default-fallbacks - default_fallbacks: ["gpt-4o-2024-08-06", "claude-3-5-sonnet-20240620"] - fallbacks: [{"gpt-4o-2024-08-06": ["claude-3-5-sonnet-20240620"]}, {"gpt-4o-2024-05-13": ["claude-3-5-sonnet-20240620"]}] \ No newline at end of file + callbacks: ["prometheus"] + service_callback: ["prometheus_system"] \ No newline at end of file diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 4a10a0179..9eab792ad 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -140,25 +140,21 @@ def safe_deep_copy(data): def log_to_opentelemetry(func): @wraps(func) async def wrapper(*args, **kwargs): - start_time = datetime.now() + start_time: datetime = datetime.now() try: result = await func(*args, **kwargs) - end_time = datetime.now() + end_time: datetime = datetime.now() + + from litellm.proxy.proxy_server import proxy_logging_obj # Log to OTEL only if "parent_otel_span" is in kwargs and is not None - if ( - "parent_otel_span" in kwargs - and kwargs["parent_otel_span"] is not None - and "proxy_logging_obj" in kwargs - and kwargs["proxy_logging_obj"] is not None - ): - proxy_logging_obj = kwargs["proxy_logging_obj"] + if "PROXY" not in func.__name__: await proxy_logging_obj.service_logging_obj.async_service_success_hook( service=ServiceTypes.DB, call_type=func.__name__, - parent_otel_span=kwargs["parent_otel_span"], - duration=0.0, + parent_otel_span=kwargs.get("parent_otel_span", None), + duration=(end_time - start_time).total_seconds(), start_time=start_time, end_time=end_time, event_metadata={ @@ -179,8 +175,6 @@ def log_to_opentelemetry(func): kwargs=passed_kwargs ) if parent_otel_span is not None: - from litellm.proxy.proxy_server import proxy_logging_obj - metadata = get_litellm_metadata_from_kwargs(kwargs=passed_kwargs) await proxy_logging_obj.service_logging_obj.async_service_success_hook( service=ServiceTypes.BATCH_WRITE_TO_DB, @@ -194,28 +188,23 @@ def log_to_opentelemetry(func): # end of logging to otel return result except Exception as e: - end_time = datetime.now() - if ( - "parent_otel_span" in kwargs - and kwargs["parent_otel_span"] is not None - and "proxy_logging_obj" in kwargs - and kwargs["proxy_logging_obj"] is not None - ): - proxy_logging_obj = kwargs["proxy_logging_obj"] - await proxy_logging_obj.service_logging_obj.async_service_failure_hook( - error=e, - service=ServiceTypes.DB, - call_type=func.__name__, - parent_otel_span=kwargs["parent_otel_span"], - duration=0.0, - start_time=start_time, - end_time=end_time, - event_metadata={ - "function_name": func.__name__, - "function_kwargs": kwargs, - "function_args": args, - }, - ) + from litellm.proxy.proxy_server import proxy_logging_obj + + end_time: datetime = datetime.now() + await proxy_logging_obj.service_logging_obj.async_service_failure_hook( + error=e, + service=ServiceTypes.DB, + call_type=func.__name__, + parent_otel_span=kwargs.get("parent_otel_span"), + duration=(end_time - start_time).total_seconds(), + start_time=start_time, + end_time=end_time, + event_metadata={ + "function_name": func.__name__, + "function_kwargs": kwargs, + "function_args": args, + }, + ) raise e return wrapper @@ -348,6 +337,7 @@ class ProxyLogging: internal_usage_cache=self.internal_usage_cache.dual_cache, ) self.premium_user = premium_user + self.service_logging_obj = ServiceLogging() def startup_event( self, @@ -422,7 +412,6 @@ class ProxyLogging: self.internal_usage_cache.dual_cache.redis_cache = redis_cache def _init_litellm_callbacks(self, llm_router: Optional[litellm.Router] = None): - self.service_logging_obj = ServiceLogging() litellm.callbacks.append(self.max_parallel_request_limiter) # type: ignore litellm.callbacks.append(self.max_budget_limiter) # type: ignore litellm.callbacks.append(self.cache_control_check) # type: ignore diff --git a/tests/local_testing/test_prometheus_service.py b/tests/local_testing/test_prometheus_service.py index 86321ea2d..49dd74839 100644 --- a/tests/local_testing/test_prometheus_service.py +++ b/tests/local_testing/test_prometheus_service.py @@ -11,6 +11,8 @@ import pytest from litellm import acompletion, Cache from litellm._service_logger import ServiceLogging from litellm.integrations.prometheus_services import PrometheusServicesLogger +from litellm.proxy.utils import ServiceTypes +from unittest.mock import patch, AsyncMock import litellm """ @@ -139,3 +141,72 @@ async def test_router_with_caching(): except Exception as e: pytest.fail(f"An exception occured - {str(e)}") + + +@pytest.mark.asyncio +async def test_service_logger_db_monitoring(): + """ + Test prometheus monitoring for database operations + """ + litellm.service_callback = ["prometheus_system"] + sl = ServiceLogging() + + # Create spy on prometheus logger's async_service_success_hook + with patch.object( + sl.prometheusServicesLogger, + "async_service_success_hook", + new_callable=AsyncMock, + ) as mock_prometheus_success: + # Test DB success monitoring + await sl.async_service_success_hook( + service=ServiceTypes.DB, + duration=0.3, + call_type="query", + event_metadata={"query_type": "SELECT", "table": "api_keys"}, + ) + + # Assert prometheus logger's success hook was called + mock_prometheus_success.assert_called_once() + # Optionally verify the payload + actual_payload = mock_prometheus_success.call_args[1]["payload"] + print("actual_payload sent to prometheus: ", actual_payload) + assert actual_payload.service == ServiceTypes.DB + assert actual_payload.duration == 0.3 + assert actual_payload.call_type == "query" + assert actual_payload.is_error is False + + +@pytest.mark.asyncio +async def test_service_logger_db_monitoring_failure(): + """ + Test prometheus monitoring for failed database operations + """ + litellm.service_callback = ["prometheus_system"] + sl = ServiceLogging() + + # Create spy on prometheus logger's async_service_failure_hook + with patch.object( + sl.prometheusServicesLogger, + "async_service_failure_hook", + new_callable=AsyncMock, + ) as mock_prometheus_failure: + # Test DB failure monitoring + test_error = Exception("Database connection failed") + await sl.async_service_failure_hook( + service=ServiceTypes.DB, + duration=0.3, + error=test_error, + call_type="query", + event_metadata={"query_type": "SELECT", "table": "api_keys"}, + ) + + # Assert prometheus logger's failure hook was called + mock_prometheus_failure.assert_called_once() + # Verify the payload + actual_payload = mock_prometheus_failure.call_args[1]["payload"] + print("actual_payload sent to prometheus: ", actual_payload) + assert actual_payload.service == ServiceTypes.DB + assert actual_payload.duration == 0.3 + assert actual_payload.call_type == "query" + assert actual_payload.is_error is True + assert actual_payload.error == "Database connection failed" diff --git a/tests/logging_callback_tests/test_log_db_redis_services.py b/tests/logging_callback_tests/test_log_db_redis_services.py new file mode 100644 index 000000000..9f5db8009 --- /dev/null +++ b/tests/logging_callback_tests/test_log_db_redis_services.py @@ -0,0 +1,128 @@ +import io +import os +import sys + + +sys.path.insert(0, os.path.abspath("../..")) + +import asyncio +import gzip +import json +import logging +import time +from unittest.mock import AsyncMock, patch + +import pytest + +import litellm +from litellm import completion +from litellm._logging import verbose_logger +from litellm.proxy.utils import log_to_opentelemetry, ServiceTypes +from datetime import datetime + + +# Test async function to decorate +@log_to_opentelemetry +async def sample_db_function(*args, **kwargs): + return "success" + + +@log_to_opentelemetry +async def sample_proxy_function(*args, **kwargs): + return "success" + + +@pytest.mark.asyncio +async def test_log_to_opentelemetry_success(): + # Mock the proxy_logging_obj + with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging: + # Setup mock + mock_proxy_logging.service_logging_obj.async_service_success_hook = AsyncMock() + + # Call the decorated function + result = await sample_db_function(parent_otel_span="test_span") + + # Assertions + assert result == "success" + mock_proxy_logging.service_logging_obj.async_service_success_hook.assert_called_once() + call_args = ( + mock_proxy_logging.service_logging_obj.async_service_success_hook.call_args[ + 1 + ] + ) + + assert call_args["service"] == ServiceTypes.DB + assert call_args["call_type"] == "sample_db_function" + assert call_args["parent_otel_span"] == "test_span" + assert isinstance(call_args["duration"], float) + assert isinstance(call_args["start_time"], datetime) + assert isinstance(call_args["end_time"], datetime) + assert "function_name" in call_args["event_metadata"] + + +@pytest.mark.asyncio +async def test_log_to_opentelemetry_duration(): + # Mock the proxy_logging_obj + with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging: + # Setup mock + mock_proxy_logging.service_logging_obj.async_service_success_hook = AsyncMock() + + # Add a delay to the function to test duration + @log_to_opentelemetry + async def delayed_function(**kwargs): + await asyncio.sleep(1) # 1 second delay + return "success" + + # Call the decorated function + start = time.time() + result = await delayed_function(parent_otel_span="test_span") + end = time.time() + + # Get the actual duration + actual_duration = end - start + + # Get the logged duration from the mock call + call_args = ( + mock_proxy_logging.service_logging_obj.async_service_success_hook.call_args[ + 1 + ] + ) + logged_duration = call_args["duration"] + + # Assert the logged duration is approximately equal to actual duration (within 0.1 seconds) + assert abs(logged_duration - actual_duration) < 0.1 + assert result == "success" + + +@pytest.mark.asyncio +async def test_log_to_opentelemetry_failure(): + # Mock the proxy_logging_obj + with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging: + # Setup mock + mock_proxy_logging.service_logging_obj.async_service_failure_hook = AsyncMock() + + # Create a failing function + @log_to_opentelemetry + async def failing_function(**kwargs): + raise ValueError("Test error") + + # Call the decorated function and expect it to raise + with pytest.raises(ValueError) as exc_info: + await failing_function(parent_otel_span="test_span") + + # Assertions + assert str(exc_info.value) == "Test error" + mock_proxy_logging.service_logging_obj.async_service_failure_hook.assert_called_once() + call_args = ( + mock_proxy_logging.service_logging_obj.async_service_failure_hook.call_args[ + 1 + ] + ) + + assert call_args["service"] == ServiceTypes.DB + assert call_args["call_type"] == "failing_function" + assert call_args["parent_otel_span"] == "test_span" + assert isinstance(call_args["duration"], float) + assert isinstance(call_args["start_time"], datetime) + assert isinstance(call_args["end_time"], datetime) + assert isinstance(call_args["error"], ValueError)