(fix) Prometheus - Log Postgres DB latency, status on prometheus (#6484)

* fix logging DB fails on prometheus

* unit testing log to otel wrapper

* unit testing for service logger + prometheus

* use LATENCY buckets for service logging

* fix service logging
This commit is contained in:
Ishaan Jaff 2024-10-29 12:17:35 +05:30 committed by GitHub
parent 4f8a3fd4cf
commit 69b1bc1f1e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 229 additions and 49 deletions

View file

@ -15,6 +15,7 @@ import requests # type: ignore
import litellm
from litellm._logging import print_verbose, verbose_logger
from litellm.types.integrations.prometheus import LATENCY_BUCKETS
from litellm.types.services import ServiceLoggerPayload, ServiceTypes
@ -96,6 +97,7 @@ class PrometheusServicesLogger:
metric_name,
"Latency for {} service".format(service),
labelnames=[service],
buckets=LATENCY_BUCKETS,
)
def create_counter(self, service: str, type_of_request: str):

View file

@ -3,18 +3,8 @@ model_list:
litellm_params:
model: gpt-4o
api_key: os.environ/OPENAI_API_KEY
tpm: 1000000
rpm: 10000
general_settings:
# master key is set via env var
# master_key: #######
proxy_batch_write_at: 60 # Batch write spend updates every 60s
api_base: https://exampleopenaiendpoint-production.up.railway.app/
litellm_settings:
store_audit_logs: true
# https://docs.litellm.ai/docs/proxy/reliability#default-fallbacks
default_fallbacks: ["gpt-4o-2024-08-06", "claude-3-5-sonnet-20240620"]
fallbacks: [{"gpt-4o-2024-08-06": ["claude-3-5-sonnet-20240620"]}, {"gpt-4o-2024-05-13": ["claude-3-5-sonnet-20240620"]}]
callbacks: ["prometheus"]
service_callback: ["prometheus_system"]

View file

@ -140,25 +140,21 @@ def safe_deep_copy(data):
def log_to_opentelemetry(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = datetime.now()
start_time: datetime = datetime.now()
try:
result = await func(*args, **kwargs)
end_time = datetime.now()
end_time: datetime = datetime.now()
from litellm.proxy.proxy_server import proxy_logging_obj
# Log to OTEL only if "parent_otel_span" is in kwargs and is not None
if (
"parent_otel_span" in kwargs
and kwargs["parent_otel_span"] is not None
and "proxy_logging_obj" in kwargs
and kwargs["proxy_logging_obj"] is not None
):
proxy_logging_obj = kwargs["proxy_logging_obj"]
if "PROXY" not in func.__name__:
await proxy_logging_obj.service_logging_obj.async_service_success_hook(
service=ServiceTypes.DB,
call_type=func.__name__,
parent_otel_span=kwargs["parent_otel_span"],
duration=0.0,
parent_otel_span=kwargs.get("parent_otel_span", None),
duration=(end_time - start_time).total_seconds(),
start_time=start_time,
end_time=end_time,
event_metadata={
@ -179,8 +175,6 @@ def log_to_opentelemetry(func):
kwargs=passed_kwargs
)
if parent_otel_span is not None:
from litellm.proxy.proxy_server import proxy_logging_obj
metadata = get_litellm_metadata_from_kwargs(kwargs=passed_kwargs)
await proxy_logging_obj.service_logging_obj.async_service_success_hook(
service=ServiceTypes.BATCH_WRITE_TO_DB,
@ -194,28 +188,23 @@ def log_to_opentelemetry(func):
# end of logging to otel
return result
except Exception as e:
end_time = datetime.now()
if (
"parent_otel_span" in kwargs
and kwargs["parent_otel_span"] is not None
and "proxy_logging_obj" in kwargs
and kwargs["proxy_logging_obj"] is not None
):
proxy_logging_obj = kwargs["proxy_logging_obj"]
await proxy_logging_obj.service_logging_obj.async_service_failure_hook(
error=e,
service=ServiceTypes.DB,
call_type=func.__name__,
parent_otel_span=kwargs["parent_otel_span"],
duration=0.0,
start_time=start_time,
end_time=end_time,
event_metadata={
"function_name": func.__name__,
"function_kwargs": kwargs,
"function_args": args,
},
)
from litellm.proxy.proxy_server import proxy_logging_obj
end_time: datetime = datetime.now()
await proxy_logging_obj.service_logging_obj.async_service_failure_hook(
error=e,
service=ServiceTypes.DB,
call_type=func.__name__,
parent_otel_span=kwargs.get("parent_otel_span"),
duration=(end_time - start_time).total_seconds(),
start_time=start_time,
end_time=end_time,
event_metadata={
"function_name": func.__name__,
"function_kwargs": kwargs,
"function_args": args,
},
)
raise e
return wrapper
@ -348,6 +337,7 @@ class ProxyLogging:
internal_usage_cache=self.internal_usage_cache.dual_cache,
)
self.premium_user = premium_user
self.service_logging_obj = ServiceLogging()
def startup_event(
self,
@ -422,7 +412,6 @@ class ProxyLogging:
self.internal_usage_cache.dual_cache.redis_cache = redis_cache
def _init_litellm_callbacks(self, llm_router: Optional[litellm.Router] = None):
self.service_logging_obj = ServiceLogging()
litellm.callbacks.append(self.max_parallel_request_limiter) # type: ignore
litellm.callbacks.append(self.max_budget_limiter) # type: ignore
litellm.callbacks.append(self.cache_control_check) # type: ignore

View file

@ -11,6 +11,8 @@ import pytest
from litellm import acompletion, Cache
from litellm._service_logger import ServiceLogging
from litellm.integrations.prometheus_services import PrometheusServicesLogger
from litellm.proxy.utils import ServiceTypes
from unittest.mock import patch, AsyncMock
import litellm
"""
@ -139,3 +141,72 @@ async def test_router_with_caching():
except Exception as e:
pytest.fail(f"An exception occured - {str(e)}")
@pytest.mark.asyncio
async def test_service_logger_db_monitoring():
"""
Test prometheus monitoring for database operations
"""
litellm.service_callback = ["prometheus_system"]
sl = ServiceLogging()
# Create spy on prometheus logger's async_service_success_hook
with patch.object(
sl.prometheusServicesLogger,
"async_service_success_hook",
new_callable=AsyncMock,
) as mock_prometheus_success:
# Test DB success monitoring
await sl.async_service_success_hook(
service=ServiceTypes.DB,
duration=0.3,
call_type="query",
event_metadata={"query_type": "SELECT", "table": "api_keys"},
)
# Assert prometheus logger's success hook was called
mock_prometheus_success.assert_called_once()
# Optionally verify the payload
actual_payload = mock_prometheus_success.call_args[1]["payload"]
print("actual_payload sent to prometheus: ", actual_payload)
assert actual_payload.service == ServiceTypes.DB
assert actual_payload.duration == 0.3
assert actual_payload.call_type == "query"
assert actual_payload.is_error is False
@pytest.mark.asyncio
async def test_service_logger_db_monitoring_failure():
"""
Test prometheus monitoring for failed database operations
"""
litellm.service_callback = ["prometheus_system"]
sl = ServiceLogging()
# Create spy on prometheus logger's async_service_failure_hook
with patch.object(
sl.prometheusServicesLogger,
"async_service_failure_hook",
new_callable=AsyncMock,
) as mock_prometheus_failure:
# Test DB failure monitoring
test_error = Exception("Database connection failed")
await sl.async_service_failure_hook(
service=ServiceTypes.DB,
duration=0.3,
error=test_error,
call_type="query",
event_metadata={"query_type": "SELECT", "table": "api_keys"},
)
# Assert prometheus logger's failure hook was called
mock_prometheus_failure.assert_called_once()
# Verify the payload
actual_payload = mock_prometheus_failure.call_args[1]["payload"]
print("actual_payload sent to prometheus: ", actual_payload)
assert actual_payload.service == ServiceTypes.DB
assert actual_payload.duration == 0.3
assert actual_payload.call_type == "query"
assert actual_payload.is_error is True
assert actual_payload.error == "Database connection failed"

View file

@ -0,0 +1,128 @@
import io
import os
import sys
sys.path.insert(0, os.path.abspath("../.."))
import asyncio
import gzip
import json
import logging
import time
from unittest.mock import AsyncMock, patch
import pytest
import litellm
from litellm import completion
from litellm._logging import verbose_logger
from litellm.proxy.utils import log_to_opentelemetry, ServiceTypes
from datetime import datetime
# Test async function to decorate
@log_to_opentelemetry
async def sample_db_function(*args, **kwargs):
return "success"
@log_to_opentelemetry
async def sample_proxy_function(*args, **kwargs):
return "success"
@pytest.mark.asyncio
async def test_log_to_opentelemetry_success():
# Mock the proxy_logging_obj
with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging:
# Setup mock
mock_proxy_logging.service_logging_obj.async_service_success_hook = AsyncMock()
# Call the decorated function
result = await sample_db_function(parent_otel_span="test_span")
# Assertions
assert result == "success"
mock_proxy_logging.service_logging_obj.async_service_success_hook.assert_called_once()
call_args = (
mock_proxy_logging.service_logging_obj.async_service_success_hook.call_args[
1
]
)
assert call_args["service"] == ServiceTypes.DB
assert call_args["call_type"] == "sample_db_function"
assert call_args["parent_otel_span"] == "test_span"
assert isinstance(call_args["duration"], float)
assert isinstance(call_args["start_time"], datetime)
assert isinstance(call_args["end_time"], datetime)
assert "function_name" in call_args["event_metadata"]
@pytest.mark.asyncio
async def test_log_to_opentelemetry_duration():
# Mock the proxy_logging_obj
with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging:
# Setup mock
mock_proxy_logging.service_logging_obj.async_service_success_hook = AsyncMock()
# Add a delay to the function to test duration
@log_to_opentelemetry
async def delayed_function(**kwargs):
await asyncio.sleep(1) # 1 second delay
return "success"
# Call the decorated function
start = time.time()
result = await delayed_function(parent_otel_span="test_span")
end = time.time()
# Get the actual duration
actual_duration = end - start
# Get the logged duration from the mock call
call_args = (
mock_proxy_logging.service_logging_obj.async_service_success_hook.call_args[
1
]
)
logged_duration = call_args["duration"]
# Assert the logged duration is approximately equal to actual duration (within 0.1 seconds)
assert abs(logged_duration - actual_duration) < 0.1
assert result == "success"
@pytest.mark.asyncio
async def test_log_to_opentelemetry_failure():
# Mock the proxy_logging_obj
with patch("litellm.proxy.proxy_server.proxy_logging_obj") as mock_proxy_logging:
# Setup mock
mock_proxy_logging.service_logging_obj.async_service_failure_hook = AsyncMock()
# Create a failing function
@log_to_opentelemetry
async def failing_function(**kwargs):
raise ValueError("Test error")
# Call the decorated function and expect it to raise
with pytest.raises(ValueError) as exc_info:
await failing_function(parent_otel_span="test_span")
# Assertions
assert str(exc_info.value) == "Test error"
mock_proxy_logging.service_logging_obj.async_service_failure_hook.assert_called_once()
call_args = (
mock_proxy_logging.service_logging_obj.async_service_failure_hook.call_args[
1
]
)
assert call_args["service"] == ServiceTypes.DB
assert call_args["call_type"] == "failing_function"
assert call_args["parent_otel_span"] == "test_span"
assert isinstance(call_args["duration"], float)
assert isinstance(call_args["start_time"], datetime)
assert isinstance(call_args["end_time"], datetime)
assert isinstance(call_args["error"], ValueError)