import io import os import sys sys.path.insert(0, os.path.abspath("../..")) import asyncio import logging import uuid import pytest from prometheus_client import REGISTRY, CollectorRegistry import litellm from litellm import completion from litellm._logging import verbose_logger from litellm.integrations.prometheus import ( PrometheusLogger, UserAPIKeyLabelValues, get_custom_labels_from_metadata, ) from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler from litellm.types.utils import ( StandardLoggingPayload, StandardLoggingMetadata, StandardLoggingHiddenParams, StandardLoggingModelInformation, ) import pytest from unittest.mock import MagicMock, patch, call from datetime import datetime, timedelta, timezone from litellm.integrations.prometheus import PrometheusLogger from litellm.proxy._types import UserAPIKeyAuth verbose_logger.setLevel(logging.DEBUG) litellm.set_verbose = True import time @pytest.fixture def prometheus_logger(): collectors = list(REGISTRY._collector_to_names.keys()) for collector in collectors: REGISTRY.unregister(collector) return PrometheusLogger() def create_standard_logging_payload() -> StandardLoggingPayload: return StandardLoggingPayload( id="test_id", call_type="completion", stream=False, response_cost=0.1, response_cost_failure_debug_info=None, status="success", total_tokens=30, prompt_tokens=20, completion_tokens=10, startTime=1234567890.0, endTime=1234567891.0, completionStartTime=1234567890.5, model_map_information=StandardLoggingModelInformation( model_map_key="gpt-3.5-turbo", model_map_value=None ), model="gpt-3.5-turbo", model_id="model-123", model_group="openai-gpt", custom_llm_provider="openai", api_base="https://api.openai.com", metadata=StandardLoggingMetadata( user_api_key_hash="test_hash", user_api_key_alias="test_alias", user_api_key_team_id="test_team", user_api_key_user_id="test_user", user_api_key_user_email="test@example.com", user_api_key_team_alias="test_team_alias", user_api_key_org_id=None, spend_logs_metadata=None, requester_ip_address="127.0.0.1", requester_metadata=None, user_api_key_end_user_id="test_end_user", ), cache_hit=False, cache_key=None, saved_cache_cost=0.0, request_tags=[], end_user=None, requester_ip_address="127.0.0.1", messages=[{"role": "user", "content": "Hello, world!"}], response={"choices": [{"message": {"content": "Hi there!"}}]}, error_str=None, model_parameters={"stream": True}, hidden_params=StandardLoggingHiddenParams( model_id="model-123", cache_key=None, api_base="https://api.openai.com", response_cost="0.1", additional_headers=None, ), ) def test_safe_get_remaining_budget(prometheus_logger): assert prometheus_logger._safe_get_remaining_budget(100, 30) == 70 assert prometheus_logger._safe_get_remaining_budget(100, None) == 100 assert prometheus_logger._safe_get_remaining_budget(None, 30) == float("inf") assert prometheus_logger._safe_get_remaining_budget(None, None) == float("inf") @pytest.mark.asyncio async def test_async_log_success_event(prometheus_logger): standard_logging_object = create_standard_logging_payload() kwargs = { "model": "gpt-3.5-turbo", "stream": True, "litellm_params": { "metadata": { "user_api_key": "test_key", "user_api_key_user_id": "test_user", "user_api_key_team_id": "test_team", "user_api_key_end_user_id": "test_end_user", } }, "start_time": datetime.now(), "completion_start_time": datetime.now(), "api_call_start_time": datetime.now(), "end_time": datetime.now() + timedelta(seconds=1), "standard_logging_object": standard_logging_object, } response_obj = MagicMock() # Mock the prometheus client methods # High Level Metrics - request/spend prometheus_logger.litellm_requests_metric = MagicMock() prometheus_logger.litellm_spend_metric = MagicMock() # Token Metrics prometheus_logger.litellm_tokens_metric = MagicMock() prometheus_logger.litellm_input_tokens_metric = MagicMock() prometheus_logger.litellm_output_tokens_metric = MagicMock() # Remaining Budget Metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_remaining_api_key_budget_metric = MagicMock() # Virtual Key Rate limit Metrics prometheus_logger.litellm_remaining_api_key_requests_for_model = MagicMock() prometheus_logger.litellm_remaining_api_key_tokens_for_model = MagicMock() # Latency Metrics prometheus_logger.litellm_llm_api_time_to_first_token_metric = MagicMock() prometheus_logger.litellm_llm_api_latency_metric = MagicMock() prometheus_logger.litellm_request_total_latency_metric = MagicMock() await prometheus_logger.async_log_success_event( kwargs, response_obj, kwargs["start_time"], kwargs["end_time"] ) # Assert that the metrics were incremented prometheus_logger.litellm_requests_metric.labels.assert_called() prometheus_logger.litellm_spend_metric.labels.assert_called() # Token Metrics prometheus_logger.litellm_tokens_metric.labels.assert_called() prometheus_logger.litellm_input_tokens_metric.labels.assert_called() prometheus_logger.litellm_output_tokens_metric.labels.assert_called() # Remaining Budget Metrics prometheus_logger.litellm_remaining_team_budget_metric.labels.assert_called() prometheus_logger.litellm_remaining_api_key_budget_metric.labels.assert_called() # Virtual Key Rate limit Metrics prometheus_logger.litellm_remaining_api_key_requests_for_model.labels.assert_called() prometheus_logger.litellm_remaining_api_key_tokens_for_model.labels.assert_called() # Latency Metrics prometheus_logger.litellm_llm_api_time_to_first_token_metric.labels.assert_called() prometheus_logger.litellm_llm_api_latency_metric.labels.assert_called() prometheus_logger.litellm_request_total_latency_metric.labels.assert_called() def test_increment_token_metrics(prometheus_logger): """ Test the increment_token_metrics method input, output, and total tokens metrics are incremented by the values in the standard logging payload """ prometheus_logger.litellm_tokens_metric = MagicMock() prometheus_logger.litellm_input_tokens_metric = MagicMock() prometheus_logger.litellm_output_tokens_metric = MagicMock() standard_logging_payload = create_standard_logging_payload() standard_logging_payload["total_tokens"] = 100 standard_logging_payload["prompt_tokens"] = 50 standard_logging_payload["completion_tokens"] = 50 enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], **standard_logging_payload, ) prometheus_logger._increment_token_metrics( standard_logging_payload, end_user_id="user1", user_api_key="key1", user_api_key_alias="alias1", model="gpt-3.5-turbo", user_api_team="team1", user_api_team_alias="team_alias1", user_id="user1", enum_values=enum_values, ) prometheus_logger.litellm_tokens_metric.labels.assert_called_once_with( "user1", "key1", "alias1", "gpt-3.5-turbo", "team1", "team_alias1", "user1" ) prometheus_logger.litellm_tokens_metric.labels().inc.assert_called_once_with(100) prometheus_logger.litellm_input_tokens_metric.labels.assert_called_once_with( end_user=None, user=None, hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", requested_model=None, model="gpt-3.5-turbo", ) prometheus_logger.litellm_input_tokens_metric.labels().inc.assert_called_once_with( 50 ) prometheus_logger.litellm_output_tokens_metric.labels.assert_called_once_with( end_user=None, user=None, hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", requested_model=None, model="gpt-3.5-turbo", ) prometheus_logger.litellm_output_tokens_metric.labels().inc.assert_called_once_with( 50 ) @pytest.mark.asyncio async def test_increment_remaining_budget_metrics(prometheus_logger): """ Test the increment_remaining_budget_metrics method - team and api key remaining budget metrics are set to the difference between max budget and spend - team and api key max budget metrics are set to their respective max budgets - team and api key remaining hours metrics are set based on budget reset timestamps """ # Mock all budget-related metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_remaining_api_key_budget_metric = MagicMock() prometheus_logger.litellm_team_max_budget_metric = MagicMock() prometheus_logger.litellm_api_key_max_budget_metric = MagicMock() prometheus_logger.litellm_team_budget_remaining_hours_metric = MagicMock() prometheus_logger.litellm_api_key_budget_remaining_hours_metric = MagicMock() # Create a future budget reset time for testing future_reset_time_team = datetime.now() + timedelta(hours=10) future_reset_time_key = datetime.now() + timedelta(hours=12) # Mock the get_team_object and get_key_object functions to return objects with budget reset times with patch( "litellm.proxy.auth.auth_checks.get_team_object" ) as mock_get_team, patch( "litellm.proxy.auth.auth_checks.get_key_object" ) as mock_get_key: mock_get_team.return_value = MagicMock(budget_reset_at=future_reset_time_team) mock_get_key.return_value = MagicMock(budget_reset_at=future_reset_time_key) litellm_params = { "metadata": { "user_api_key_team_spend": 50, "user_api_key_team_max_budget": 100, "user_api_key_spend": 25, "user_api_key_max_budget": 75, } } await prometheus_logger._increment_remaining_budget_metrics( user_api_team="team1", user_api_team_alias="team_alias1", user_api_key="key1", user_api_key_alias="alias1", litellm_params=litellm_params, response_cost=10, ) # Test remaining budget metrics prometheus_logger.litellm_remaining_team_budget_metric.labels.assert_called_once_with( team="team1", team_alias="team_alias1" ) prometheus_logger.litellm_remaining_team_budget_metric.labels().set.assert_called_once_with( 40 # 100 - (50 + 10) ) prometheus_logger.litellm_remaining_api_key_budget_metric.labels.assert_called_once_with( hashed_api_key="key1", api_key_alias="alias1" ) prometheus_logger.litellm_remaining_api_key_budget_metric.labels().set.assert_called_once_with( 40 # 75 - (25 + 10) ) # Test max budget metrics prometheus_logger.litellm_team_max_budget_metric.labels.assert_called_once_with( team="team1", team_alias="team_alias1" ) prometheus_logger.litellm_team_max_budget_metric.labels().set.assert_called_once_with( 100 ) prometheus_logger.litellm_api_key_max_budget_metric.labels.assert_called_once_with( hashed_api_key="key1", api_key_alias="alias1" ) prometheus_logger.litellm_api_key_max_budget_metric.labels().set.assert_called_once_with( 75 ) # Test remaining hours metrics prometheus_logger.litellm_team_budget_remaining_hours_metric.labels.assert_called_once_with( team="team1", team_alias="team_alias1" ) # The remaining hours should be approximately 10 (with some small difference due to test execution time) remaining_hours_call = prometheus_logger.litellm_team_budget_remaining_hours_metric.labels().set.call_args[ 0 ][ 0 ] assert 9.9 <= remaining_hours_call <= 10.0 prometheus_logger.litellm_api_key_budget_remaining_hours_metric.labels.assert_called_once_with( hashed_api_key="key1", api_key_alias="alias1" ) # The remaining hours should be approximately 10 (with some small difference due to test execution time) remaining_hours_call = prometheus_logger.litellm_api_key_budget_remaining_hours_metric.labels().set.call_args[ 0 ][ 0 ] assert 11.9 <= remaining_hours_call <= 12.0 def test_set_latency_metrics(prometheus_logger): """ Test the set_latency_metrics method time to first token, llm api latency, and request total latency metrics are set to the values in the standard logging payload """ standard_logging_payload = create_standard_logging_payload() prometheus_logger.litellm_llm_api_time_to_first_token_metric = MagicMock() prometheus_logger.litellm_llm_api_latency_metric = MagicMock() prometheus_logger.litellm_request_total_latency_metric = MagicMock() enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], requested_model=standard_logging_payload["model_group"], user=standard_logging_payload["metadata"]["user_api_key_user_id"], **standard_logging_payload, ) now = datetime.now() kwargs = { "end_time": now, # when the request ends "start_time": now - timedelta(seconds=2), # when the request starts "api_call_start_time": now - timedelta(seconds=1.5), # when the api call starts "completion_start_time": now - timedelta(seconds=1), # when the completion starts "stream": True, } prometheus_logger._set_latency_metrics( kwargs=kwargs, model="gpt-3.5-turbo", user_api_key="key1", user_api_key_alias="alias1", user_api_team="team1", user_api_team_alias="team_alias1", enum_values=enum_values, ) # completion_start_time - api_call_start_time prometheus_logger.litellm_llm_api_time_to_first_token_metric.labels.assert_called_once_with( "gpt-3.5-turbo", "key1", "alias1", "team1", "team_alias1" ) prometheus_logger.litellm_llm_api_time_to_first_token_metric.labels().observe.assert_called_once_with( 0.5 ) # end_time - api_call_start_time prometheus_logger.litellm_llm_api_latency_metric.labels.assert_called_once_with( end_user=None, user="test_user", hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", requested_model="openai-gpt", model="gpt-3.5-turbo", ) prometheus_logger.litellm_llm_api_latency_metric.labels().observe.assert_called_once_with( 1.5 ) # total latency for the request prometheus_logger.litellm_request_total_latency_metric.labels.assert_called_once_with( end_user=None, user="test_user", hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", requested_model="openai-gpt", model="gpt-3.5-turbo", ) prometheus_logger.litellm_request_total_latency_metric.labels().observe.assert_called_once_with( 2.0 ) def test_set_latency_metrics_missing_timestamps(prometheus_logger): """ Test that _set_latency_metrics handles missing timestamp values gracefully """ # Mock all metrics used in the method prometheus_logger.litellm_llm_api_time_to_first_token_metric = MagicMock() prometheus_logger.litellm_llm_api_latency_metric = MagicMock() prometheus_logger.litellm_request_total_latency_metric = MagicMock() standard_logging_payload = create_standard_logging_payload() enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], ) # Test case where completion_start_time is None kwargs = { "end_time": datetime.now(), "start_time": datetime.now() - timedelta(seconds=2), "api_call_start_time": datetime.now() - timedelta(seconds=1.5), "completion_start_time": None, # Missing completion start time "stream": True, } # This should not raise an exception prometheus_logger._set_latency_metrics( kwargs=kwargs, model="gpt-3.5-turbo", user_api_key="key1", user_api_key_alias="alias1", user_api_team="team1", user_api_team_alias="team_alias1", enum_values=enum_values, ) # Verify time to first token metric was not called due to missing completion_start_time prometheus_logger.litellm_llm_api_time_to_first_token_metric.labels.assert_not_called() # Other metrics should still be called prometheus_logger.litellm_llm_api_latency_metric.labels.assert_called_once() prometheus_logger.litellm_request_total_latency_metric.labels.assert_called_once() def test_set_latency_metrics_missing_api_call_start(prometheus_logger): """ Test that _set_latency_metrics handles missing api_call_start_time gracefully """ # Mock all metrics used in the method prometheus_logger.litellm_llm_api_time_to_first_token_metric = MagicMock() prometheus_logger.litellm_llm_api_latency_metric = MagicMock() prometheus_logger.litellm_request_total_latency_metric = MagicMock() standard_logging_payload = create_standard_logging_payload() enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], ) # Test case where api_call_start_time is None kwargs = { "end_time": datetime.now(), "start_time": datetime.now() - timedelta(seconds=2), "api_call_start_time": None, # Missing API call start time "completion_start_time": datetime.now() - timedelta(seconds=1), "stream": True, } # This should not raise an exception prometheus_logger._set_latency_metrics( kwargs=kwargs, model="gpt-3.5-turbo", user_api_key="key1", user_api_key_alias="alias1", user_api_team="team1", user_api_team_alias="team_alias1", enum_values=enum_values, ) # Verify API latency metrics were not called due to missing api_call_start_time prometheus_logger.litellm_llm_api_time_to_first_token_metric.labels.assert_not_called() prometheus_logger.litellm_llm_api_latency_metric.labels.assert_not_called() # Total request latency should still be called prometheus_logger.litellm_request_total_latency_metric.labels.assert_called_once() def test_increment_top_level_request_and_spend_metrics(prometheus_logger): """ Test the increment_top_level_request_and_spend_metrics method - litellm_requests_metric is incremented by 1 - litellm_spend_metric is incremented by the response cost in the standard logging payload """ standard_logging_payload = create_standard_logging_payload() enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], **standard_logging_payload, ) prometheus_logger.litellm_requests_metric = MagicMock() prometheus_logger.litellm_spend_metric = MagicMock() prometheus_logger._increment_top_level_request_and_spend_metrics( end_user_id="user1", user_api_key="key1", user_api_key_alias="alias1", model="gpt-3.5-turbo", user_api_team="team1", user_api_team_alias="team_alias1", user_id="user1", response_cost=0.1, enum_values=enum_values, ) prometheus_logger.litellm_requests_metric.labels.assert_called_once_with( end_user=None, user=None, hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", model="gpt-3.5-turbo", user_email=None, ) prometheus_logger.litellm_requests_metric.labels().inc.assert_called_once() prometheus_logger.litellm_spend_metric.labels.assert_called_once_with( "user1", "key1", "alias1", "gpt-3.5-turbo", "team1", "team_alias1", "user1" ) prometheus_logger.litellm_spend_metric.labels().inc.assert_called_once_with(0.1) @pytest.mark.asyncio async def test_async_log_failure_event(prometheus_logger): # NOTE: almost all params for this metric are read from standard logging payload standard_logging_object = create_standard_logging_payload() kwargs = { "model": "gpt-3.5-turbo", "litellm_params": { "custom_llm_provider": "openai", }, "start_time": datetime.now(), "completion_start_time": datetime.now(), "api_call_start_time": datetime.now(), "end_time": datetime.now() + timedelta(seconds=1), "standard_logging_object": standard_logging_object, "exception": Exception("Test error"), } response_obj = MagicMock() # Mock the metrics prometheus_logger.litellm_llm_api_failed_requests_metric = MagicMock() prometheus_logger.litellm_deployment_failure_responses = MagicMock() prometheus_logger.litellm_deployment_total_requests = MagicMock() prometheus_logger.set_deployment_partial_outage = MagicMock() await prometheus_logger.async_log_failure_event( kwargs, response_obj, kwargs["start_time"], kwargs["end_time"] ) # litellm_llm_api_failed_requests_metric incremented """ Expected metrics end_user_id, user_api_key, user_api_key_alias, model, user_api_team, user_api_team_alias, user_id, """ prometheus_logger.litellm_llm_api_failed_requests_metric.labels.assert_called_once_with( None, "test_hash", "test_alias", "gpt-3.5-turbo", "test_team", "test_team_alias", "test_user", ) prometheus_logger.litellm_llm_api_failed_requests_metric.labels().inc.assert_called_once() # deployment should be marked in partial outage prometheus_logger.set_deployment_partial_outage.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", ) # deployment failure responses incremented prometheus_logger.litellm_deployment_failure_responses.labels.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", exception_status="None", exception_class="Exception", requested_model="openai-gpt", # passed in standard logging payload hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", ) prometheus_logger.litellm_deployment_failure_responses.labels().inc.assert_called_once() # deployment total requests incremented prometheus_logger.litellm_deployment_total_requests.labels.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", requested_model="openai-gpt", # passed in standard logging payload hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", ) prometheus_logger.litellm_deployment_total_requests.labels().inc.assert_called_once() @pytest.mark.asyncio async def test_async_post_call_failure_hook(prometheus_logger): """ Test for the async_post_call_failure_hook method it should increment the litellm_proxy_failed_requests_metric and litellm_proxy_total_requests_metric """ # Mock the prometheus metrics prometheus_logger.litellm_proxy_failed_requests_metric = MagicMock() prometheus_logger.litellm_proxy_total_requests_metric = MagicMock() # Create test data request_data = {"model": "gpt-3.5-turbo"} original_exception = litellm.RateLimitError( message="Test error", llm_provider="openai", model="gpt-3.5-turbo" ) user_api_key_dict = UserAPIKeyAuth( api_key="test_key", key_alias="test_alias", team_id="test_team", team_alias="test_team_alias", user_id="test_user", end_user_id="test_end_user", ) # Call the function await prometheus_logger.async_post_call_failure_hook( request_data=request_data, original_exception=original_exception, user_api_key_dict=user_api_key_dict, ) # Assert failed requests metric was incremented with correct labels prometheus_logger.litellm_proxy_failed_requests_metric.labels.assert_called_once_with( end_user="test_end_user", hashed_api_key="test_key", api_key_alias="test_alias", requested_model="gpt-3.5-turbo", team="test_team", team_alias="test_team_alias", user="test_user", exception_status="429", exception_class="RateLimitError", ) prometheus_logger.litellm_proxy_failed_requests_metric.labels().inc.assert_called_once() # Assert total requests metric was incremented with correct labels prometheus_logger.litellm_proxy_total_requests_metric.labels.assert_called_once_with( end_user="test_end_user", hashed_api_key="test_key", api_key_alias="test_alias", requested_model="gpt-3.5-turbo", team="test_team", team_alias="test_team_alias", user="test_user", status_code="429", user_email=None, ) prometheus_logger.litellm_proxy_total_requests_metric.labels().inc.assert_called_once() @pytest.mark.asyncio async def test_async_post_call_success_hook(prometheus_logger): """ Test for the async_post_call_success_hook method it should increment the litellm_proxy_total_requests_metric """ # Mock the prometheus metric prometheus_logger.litellm_proxy_total_requests_metric = MagicMock() # Create test data data = {"model": "gpt-3.5-turbo"} user_api_key_dict = UserAPIKeyAuth( api_key="test_key", key_alias="test_alias", team_id="test_team", team_alias="test_team_alias", user_id="test_user", end_user_id="test_end_user", ) response = {"choices": [{"message": {"content": "test response"}}]} # Call the function await prometheus_logger.async_post_call_success_hook( data=data, user_api_key_dict=user_api_key_dict, response=response ) # Assert total requests metric was incremented with correct labels prometheus_logger.litellm_proxy_total_requests_metric.labels.assert_called_once_with( end_user="test_end_user", hashed_api_key="test_key", api_key_alias="test_alias", requested_model="gpt-3.5-turbo", team="test_team", team_alias="test_team_alias", user="test_user", status_code="200", user_email=None, ) prometheus_logger.litellm_proxy_total_requests_metric.labels().inc.assert_called_once() def test_set_llm_deployment_success_metrics(prometheus_logger): # Mock all the metrics used in the method prometheus_logger.litellm_remaining_requests_metric = MagicMock() prometheus_logger.litellm_remaining_tokens_metric = MagicMock() prometheus_logger.litellm_deployment_success_responses = MagicMock() prometheus_logger.litellm_deployment_total_requests = MagicMock() prometheus_logger.litellm_deployment_latency_per_output_token = MagicMock() prometheus_logger.set_deployment_healthy = MagicMock() prometheus_logger.litellm_overhead_latency_metric = MagicMock() standard_logging_payload = create_standard_logging_payload() standard_logging_payload["hidden_params"]["additional_headers"] = { "x_ratelimit_remaining_requests": 123, "x_ratelimit_remaining_tokens": 4321, } standard_logging_payload["hidden_params"]["litellm_overhead_time_ms"] = 100 # Create test data request_kwargs = { "model": "gpt-3.5-turbo", "litellm_params": { "custom_llm_provider": "openai", "metadata": {"model_info": {"id": "model-123"}}, }, "standard_logging_object": standard_logging_payload, } enum_values = UserAPIKeyLabelValues( litellm_model_name=standard_logging_payload["model"], api_provider=standard_logging_payload["custom_llm_provider"], hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], **standard_logging_payload, ) start_time = datetime.now() end_time = start_time + timedelta(seconds=1) output_tokens = 10 # Call the function prometheus_logger.set_llm_deployment_success_metrics( request_kwargs=request_kwargs, start_time=start_time, end_time=end_time, output_tokens=output_tokens, enum_values=enum_values, ) # Verify remaining requests metric prometheus_logger.litellm_remaining_requests_metric.labels.assert_called_once_with( "openai-gpt", # model_group / requested model from create_standard_logging_payload() "openai", # llm provider "https://api.openai.com", # api base "gpt-3.5-turbo", # actual model used - litellm model name standard_logging_payload["metadata"]["user_api_key_hash"], standard_logging_payload["metadata"]["user_api_key_alias"], ) prometheus_logger.litellm_remaining_requests_metric.labels().set.assert_called_once_with( 123 ) # Verify remaining tokens metric prometheus_logger.litellm_remaining_tokens_metric.labels.assert_called_once_with( "openai-gpt", # model_group / requested model from create_standard_logging_payload() "openai", # llm provider "https://api.openai.com", # api base "gpt-3.5-turbo", # actual model used - litellm model name standard_logging_payload["metadata"]["user_api_key_hash"], standard_logging_payload["metadata"]["user_api_key_alias"], ) prometheus_logger.litellm_remaining_tokens_metric.labels().set.assert_called_once_with( 4321 ) # Verify deployment healthy state prometheus_logger.set_deployment_healthy.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", ) # Verify success responses metric prometheus_logger.litellm_deployment_success_responses.labels.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", requested_model="openai-gpt", # requested model from create_standard_logging_payload() hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], ) prometheus_logger.litellm_deployment_success_responses.labels().inc.assert_called_once() # Verify total requests metric prometheus_logger.litellm_deployment_total_requests.labels.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", requested_model="openai-gpt", # requested model from create_standard_logging_payload() hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], ) prometheus_logger.litellm_deployment_total_requests.labels().inc.assert_called_once() # Verify latency per output token metric prometheus_logger.litellm_deployment_latency_per_output_token.labels.assert_called_once_with( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", hashed_api_key=standard_logging_payload["metadata"]["user_api_key_hash"], api_key_alias=standard_logging_payload["metadata"]["user_api_key_alias"], team=standard_logging_payload["metadata"]["user_api_key_team_id"], team_alias=standard_logging_payload["metadata"]["user_api_key_team_alias"], ) prometheus_logger.litellm_overhead_latency_metric.labels.assert_called_once_with( "openai-gpt", # model_group / requested model from create_standard_logging_payload() "openai", # llm provider "https://api.openai.com", # api base "gpt-3.5-turbo", # actual model used - litellm model name standard_logging_payload["metadata"]["user_api_key_hash"], standard_logging_payload["metadata"]["user_api_key_alias"], ) # Calculate expected latency per token (1 second / 10 tokens = 0.1 seconds per token) expected_latency_per_token = 0.1 prometheus_logger.litellm_deployment_latency_per_output_token.labels().observe.assert_called_once_with( expected_latency_per_token ) @pytest.mark.asyncio async def test_log_success_fallback_event(prometheus_logger): prometheus_logger.litellm_deployment_successful_fallbacks = MagicMock() original_model_group = "gpt-3.5-turbo" kwargs = { "model": "gpt-4", "metadata": { "user_api_key_hash": "test_hash", "user_api_key_alias": "test_alias", "user_api_key_team_id": "test_team", "user_api_key_team_alias": "test_team_alias", }, } original_exception = litellm.RateLimitError( message="Test error", llm_provider="openai", model="gpt-3.5-turbo" ) await prometheus_logger.log_success_fallback_event( original_model_group=original_model_group, kwargs=kwargs, original_exception=original_exception, ) prometheus_logger.litellm_deployment_successful_fallbacks.labels.assert_called_once_with( requested_model=original_model_group, fallback_model="gpt-4", hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", exception_status="429", exception_class="RateLimitError", ) prometheus_logger.litellm_deployment_successful_fallbacks.labels().inc.assert_called_once() @pytest.mark.asyncio async def test_log_failure_fallback_event(prometheus_logger): prometheus_logger.litellm_deployment_failed_fallbacks = MagicMock() original_model_group = "gpt-3.5-turbo" kwargs = { "model": "gpt-4", "metadata": { "user_api_key_hash": "test_hash", "user_api_key_alias": "test_alias", "user_api_key_team_id": "test_team", "user_api_key_team_alias": "test_team_alias", }, } original_exception = litellm.RateLimitError( message="Test error", llm_provider="openai", model="gpt-3.5-turbo" ) await prometheus_logger.log_failure_fallback_event( original_model_group=original_model_group, kwargs=kwargs, original_exception=original_exception, ) prometheus_logger.litellm_deployment_failed_fallbacks.labels.assert_called_once_with( requested_model=original_model_group, fallback_model="gpt-4", hashed_api_key="test_hash", api_key_alias="test_alias", team="test_team", team_alias="test_team_alias", exception_status="429", exception_class="RateLimitError", ) prometheus_logger.litellm_deployment_failed_fallbacks.labels().inc.assert_called_once() def test_deployment_state_management(prometheus_logger): prometheus_logger.litellm_deployment_state = MagicMock() test_params = { "litellm_model_name": "gpt-3.5-turbo", "model_id": "model-123", "api_base": "https://api.openai.com", "api_provider": "openai", } # Test set_deployment_healthy (state=0) prometheus_logger.set_deployment_healthy(**test_params) prometheus_logger.litellm_deployment_state.labels.assert_called_with( test_params["litellm_model_name"], test_params["model_id"], test_params["api_base"], test_params["api_provider"], ) prometheus_logger.litellm_deployment_state.labels().set.assert_called_with(0) # Test set_deployment_partial_outage (state=1) prometheus_logger.set_deployment_partial_outage(**test_params) prometheus_logger.litellm_deployment_state.labels().set.assert_called_with(1) # Test set_deployment_complete_outage (state=2) prometheus_logger.set_deployment_complete_outage(**test_params) prometheus_logger.litellm_deployment_state.labels().set.assert_called_with(2) def test_increment_deployment_cooled_down(prometheus_logger): prometheus_logger.litellm_deployment_cooled_down = MagicMock() prometheus_logger.increment_deployment_cooled_down( litellm_model_name="gpt-3.5-turbo", model_id="model-123", api_base="https://api.openai.com", api_provider="openai", exception_status="429", ) prometheus_logger.litellm_deployment_cooled_down.labels.assert_called_once_with( "gpt-3.5-turbo", "model-123", "https://api.openai.com", "openai", "429" ) prometheus_logger.litellm_deployment_cooled_down.labels().inc.assert_called_once() @pytest.mark.parametrize("disable_end_user_tracking", [True, False]) def test_prometheus_factory(monkeypatch, disable_end_user_tracking): from litellm.integrations.prometheus import prometheus_label_factory from litellm.types.integrations.prometheus import UserAPIKeyLabelValues monkeypatch.setattr( "litellm.disable_end_user_cost_tracking_prometheus_only", disable_end_user_tracking, ) enum_values = UserAPIKeyLabelValues( end_user="test_end_user", api_key_hash="test_hash", api_key_alias="test_alias", ) supported_labels = ["end_user", "api_key_hash", "api_key_alias"] returned_dict = prometheus_label_factory( supported_enum_labels=supported_labels, enum_values=enum_values ) if disable_end_user_tracking: assert returned_dict["end_user"] == None else: assert returned_dict["end_user"] == "test_end_user" def test_get_custom_labels_from_metadata(monkeypatch): monkeypatch.setattr( "litellm.custom_prometheus_metadata_labels", ["metadata.foo", "metadata.bar"] ) metadata = {"foo": "bar", "bar": "baz", "taz": "qux"} assert get_custom_labels_from_metadata(metadata) == { "metadata_foo": "bar", "metadata_bar": "baz", } @pytest.mark.asyncio(scope="session") async def test_initialize_remaining_budget_metrics(prometheus_logger): """ Test that _initialize_remaining_budget_metrics correctly sets budget metrics for all teams """ litellm.prometheus_initialize_budget_metrics = True # Mock the prisma client and get_paginated_teams function with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma, patch( "litellm.proxy.management_endpoints.team_endpoints.get_paginated_teams" ) as mock_get_teams: # Create mock team data with proper datetime objects for budget_reset_at future_reset = datetime.now() + timedelta(hours=24) # Reset 24 hours from now mock_teams = [ MagicMock( team_id="team1", team_alias="alias1", max_budget=100, spend=30, budget_reset_at=future_reset, ), MagicMock( team_id="team2", team_alias="alias2", max_budget=200, spend=50, budget_reset_at=future_reset, ), MagicMock( team_id="team3", team_alias=None, max_budget=300, spend=100, budget_reset_at=future_reset, ), ] # Mock get_paginated_teams to return our test data mock_get_teams.return_value = (mock_teams, len(mock_teams)) # Mock the Prometheus metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_team_budget_remaining_hours_metric = MagicMock() # Call the function await prometheus_logger._initialize_remaining_budget_metrics() # Verify the remaining budget metric was set correctly for each team expected_budget_calls = [ call.labels("team1", "alias1").set(70), # 100 - 30 call.labels("team2", "alias2").set(150), # 200 - 50 call.labels("team3", "").set(200), # 300 - 100 ] prometheus_logger.litellm_remaining_team_budget_metric.assert_has_calls( expected_budget_calls, any_order=True ) # Get all the calls made to the hours metric hours_calls = ( prometheus_logger.litellm_team_budget_remaining_hours_metric.mock_calls ) # Verify the structure and approximate values of the hours calls assert len(hours_calls) == 6 # 3 teams * 2 calls each (labels + set) # Helper function to extract hours value from call def get_hours_from_call(call_obj): if "set" in str(call_obj): return call_obj[1][0] # Extract the hours value return None # Verify each team's hours are approximately 24 (within reasonable bounds) hours_values = [ get_hours_from_call(call) for call in hours_calls if get_hours_from_call(call) is not None ] for hours in hours_values: assert ( 23.9 <= hours <= 24.0 ), f"Hours value {hours} not within expected range" # Verify the labels were called with correct team information label_calls = [ call.labels(team="team1", team_alias="alias1"), call.labels(team="team2", team_alias="alias2"), call.labels(team="team3", team_alias=""), ] prometheus_logger.litellm_team_budget_remaining_hours_metric.assert_has_calls( label_calls, any_order=True ) @pytest.mark.asyncio async def test_initialize_remaining_budget_metrics_exception_handling( prometheus_logger, ): """ Test that _initialize_remaining_budget_metrics properly handles exceptions """ litellm.prometheus_initialize_budget_metrics = True # Mock the prisma client and get_paginated_teams function to raise an exception with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma, patch( "litellm.proxy.management_endpoints.team_endpoints.get_paginated_teams" ) as mock_get_teams, patch( "litellm.proxy.management_endpoints.key_management_endpoints._list_key_helper" ) as mock_list_keys: # Make get_paginated_teams raise an exception mock_get_teams.side_effect = Exception("Database error") mock_list_keys.side_effect = Exception("Key listing error") # Mock the Prometheus metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_remaining_api_key_budget_metric = MagicMock() # Mock the logger to capture the error with patch("litellm._logging.verbose_logger.exception") as mock_logger: # Call the function await prometheus_logger._initialize_remaining_budget_metrics() # Verify both errors were logged assert mock_logger.call_count == 2 assert ( "Error initializing teams budget metrics" in mock_logger.call_args_list[0][0][0] ) assert ( "Error initializing keys budget metrics" in mock_logger.call_args_list[1][0][0] ) # Verify the metrics were never called prometheus_logger.litellm_remaining_team_budget_metric.assert_not_called() prometheus_logger.litellm_remaining_api_key_budget_metric.assert_not_called() def test_initialize_prometheus_startup_metrics_no_loop(prometheus_logger): """ Test that _initialize_prometheus_startup_metrics handles case when no event loop exists """ # Mock asyncio.get_running_loop to raise RuntimeError litellm.prometheus_initialize_budget_metrics = True with patch( "asyncio.get_running_loop", side_effect=RuntimeError("No running event loop") ), patch("litellm._logging.verbose_logger.exception") as mock_logger: # Call the function prometheus_logger._initialize_prometheus_startup_metrics() # Verify the error was logged mock_logger.assert_called_once() assert "No running event loop" in mock_logger.call_args[0][0] @pytest.mark.asyncio(scope="session") async def test_initialize_api_key_budget_metrics(prometheus_logger): """ Test that _initialize_api_key_budget_metrics correctly sets budget metrics for all API keys """ litellm.prometheus_initialize_budget_metrics = True # Mock the prisma client and _list_key_helper function with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma, patch( "litellm.proxy.management_endpoints.key_management_endpoints._list_key_helper" ) as mock_list_keys: # Create mock key data with proper datetime objects for budget_reset_at future_reset = datetime.now() + timedelta(hours=24) # Reset 24 hours from now key1 = UserAPIKeyAuth( api_key="key1_hash", key_alias="alias1", team_id="team1", max_budget=100, spend=30, budget_reset_at=future_reset, ) key1.token = "key1_hash" key2 = UserAPIKeyAuth( api_key="key2_hash", key_alias="alias2", team_id="team2", max_budget=200, spend=50, budget_reset_at=future_reset, ) key2.token = "key2_hash" key3 = UserAPIKeyAuth( api_key="key3_hash", key_alias=None, team_id="team3", max_budget=300, spend=100, budget_reset_at=future_reset, ) key3.token = "key3_hash" mock_keys = [ key1, key2, key3, ] # Mock _list_key_helper to return our test data mock_list_keys.return_value = {"keys": mock_keys, "total_count": len(mock_keys)} # Mock the Prometheus metrics prometheus_logger.litellm_remaining_api_key_budget_metric = MagicMock() prometheus_logger.litellm_api_key_budget_remaining_hours_metric = MagicMock() prometheus_logger.litellm_api_key_max_budget_metric = MagicMock() # Call the function await prometheus_logger._initialize_api_key_budget_metrics() # Verify the remaining budget metric was set correctly for each key expected_budget_calls = [ call.labels("key1_hash", "alias1").set(70), # 100 - 30 call.labels("key2_hash", "alias2").set(150), # 200 - 50 call.labels("key3_hash", "").set(200), # 300 - 100 ] prometheus_logger.litellm_remaining_api_key_budget_metric.assert_has_calls( expected_budget_calls, any_order=True ) # Get all the calls made to the hours metric hours_calls = ( prometheus_logger.litellm_api_key_budget_remaining_hours_metric.mock_calls ) # Verify the structure and approximate values of the hours calls assert len(hours_calls) == 6 # 3 keys * 2 calls each (labels + set) # Helper function to extract hours value from call def get_hours_from_call(call_obj): if "set" in str(call_obj): return call_obj[1][0] # Extract the hours value return None # Verify each key's hours are approximately 24 (within reasonable bounds) hours_values = [ get_hours_from_call(call) for call in hours_calls if get_hours_from_call(call) is not None ] for hours in hours_values: assert ( 23.9 <= hours <= 24.0 ), f"Hours value {hours} not within expected range" # Verify max budget metric was set correctly for each key expected_max_budget_calls = [ call.labels("key1_hash", "alias1").set(100), call.labels("key2_hash", "alias2").set(200), call.labels("key3_hash", "").set(300), ] prometheus_logger.litellm_api_key_max_budget_metric.assert_has_calls( expected_max_budget_calls, any_order=True ) def test_set_team_budget_metrics_multiple_teams(prometheus_logger): """ Test that _set_team_budget_metrics correctly handles multiple teams with different budgets and reset times """ # Create test teams with different budgets and reset times teams = [ MagicMock( team_id="team1", team_alias="alias1", spend=50.0, max_budget=100.0, budget_reset_at=datetime(2024, 12, 31, tzinfo=timezone.utc), ), MagicMock( team_id="team2", team_alias="alias2", spend=75.0, max_budget=150.0, budget_reset_at=datetime(2024, 6, 30, tzinfo=timezone.utc), ), MagicMock( team_id="team3", team_alias="alias3", spend=25.0, max_budget=200.0, budget_reset_at=datetime(2024, 3, 31, tzinfo=timezone.utc), ), ] # Mock the metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_team_max_budget_metric = MagicMock() prometheus_logger.litellm_team_budget_remaining_hours_metric = MagicMock() # Set metrics for each team for team in teams: prometheus_logger._set_team_budget_metrics(team) # Verify remaining budget metric calls expected_remaining_budget_calls = [ call.labels(team="team1", team_alias="alias1").set(50.0), # 100 - 50 call.labels(team="team2", team_alias="alias2").set(75.0), # 150 - 75 call.labels(team="team3", team_alias="alias3").set(175.0), # 200 - 25 ] prometheus_logger.litellm_remaining_team_budget_metric.assert_has_calls( expected_remaining_budget_calls, any_order=True ) # Verify max budget metric calls expected_max_budget_calls = [ call.labels("team1", "alias1").set(100.0), call.labels("team2", "alias2").set(150.0), call.labels("team3", "alias3").set(200.0), ] prometheus_logger.litellm_team_max_budget_metric.assert_has_calls( expected_max_budget_calls, any_order=True ) # Verify budget reset metric calls # Note: The exact hours will depend on the current time, so we'll just verify the structure assert ( prometheus_logger.litellm_team_budget_remaining_hours_metric.labels.call_count == 3 ) assert ( prometheus_logger.litellm_team_budget_remaining_hours_metric.labels().set.call_count == 3 ) def test_set_team_budget_metrics_null_values(prometheus_logger): """ Test that _set_team_budget_metrics correctly handles null/None values """ # Create test team with null values team = MagicMock( team_id="team_null", team_alias=None, # Test null alias spend=None, # Test null spend max_budget=None, # Test null max_budget budget_reset_at=None, # Test null reset time ) # Mock the metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_team_max_budget_metric = MagicMock() prometheus_logger.litellm_team_budget_remaining_hours_metric = MagicMock() # Set metrics for the team prometheus_logger._set_team_budget_metrics(team) # Verify remaining budget metric is set to infinity when max_budget is None prometheus_logger.litellm_remaining_team_budget_metric.labels.assert_called_once_with( team="team_null", team_alias="" ) prometheus_logger.litellm_remaining_team_budget_metric.labels().set.assert_called_once_with( float("inf") ) # Verify max budget metric is not set when max_budget is None prometheus_logger.litellm_team_max_budget_metric.assert_not_called() # Verify reset metric is not set when budget_reset_at is None prometheus_logger.litellm_team_budget_remaining_hours_metric.assert_not_called() def test_set_team_budget_metrics_with_custom_labels(prometheus_logger, monkeypatch): """ Test that _set_team_budget_metrics correctly handles custom prometheus labels """ # Set custom prometheus labels custom_labels = ["metadata.organization", "metadata.environment"] monkeypatch.setattr("litellm.custom_prometheus_metadata_labels", custom_labels) # Create test team with custom metadata team = MagicMock( team_id="team1", team_alias="alias1", spend=50.0, max_budget=100.0, budget_reset_at=datetime(2024, 12, 31, tzinfo=timezone.utc), ) # Mock the metrics prometheus_logger.litellm_remaining_team_budget_metric = MagicMock() prometheus_logger.litellm_team_max_budget_metric = MagicMock() prometheus_logger.litellm_team_budget_remaining_hours_metric = MagicMock() # Set metrics for the team prometheus_logger._set_team_budget_metrics(team) # Verify remaining budget metric includes custom labels prometheus_logger.litellm_remaining_team_budget_metric.labels.assert_called_once_with( team="team1", team_alias="alias1", metadata_organization=None, metadata_environment=None, ) prometheus_logger.litellm_remaining_team_budget_metric.labels().set.assert_called_once_with( 50.0 ) # 100 - 50 # Verify max budget metric includes custom labels prometheus_logger.litellm_team_max_budget_metric.labels.assert_called_once_with( team="team1", team_alias="alias1", metadata_organization=None, metadata_environment=None, ) prometheus_logger.litellm_team_max_budget_metric.labels().set.assert_called_once_with( 100.0 ) # Verify budget reset metric includes custom labels budget_reset_calls = ( prometheus_logger.litellm_team_budget_remaining_hours_metric.labels.call_args_list ) assert len(budget_reset_calls) == 1 assert budget_reset_calls[0][1] == { "team": "team1", "team_alias": "alias1", "metadata_organization": None, "metadata_environment": None, }