(feat proxy prometheus) track virtual key, key alias, error code, error code class on prometheus (#5968)

* track api key and team in prom latency metric

* add test for latency metric

* test prometheus success metrics for latency

* track team and key labels for deployment failures

* add test for litellm_deployment_failure_responses_total

* fix checks for premium user on prometheus

* log_success_fallback_event and log_failure_fallback_event

* log original_exception in log_success_fallback_event

* track key, team and exception status and class on fallback metrics

* use get_standard_logging_metadata

* fix import error

* track litellm_deployment_successful_fallbacks

* add test test_proxy_fallback_metrics

* add log log_success_fallback_event

* fix test prometheus
This commit is contained in:
Ishaan Jaff 2024-09-28 19:00:21 -07:00 committed by GitHub
parent b817974c8e
commit 49ec40b1cb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 426 additions and 62 deletions

View file

@ -67,10 +67,14 @@ class CustomLogger: # https://docs.litellm.ai/docs/observability/custom_callbac
):
pass
async def log_success_fallback_event(self, original_model_group: str, kwargs: dict):
async def log_success_fallback_event(
self, original_model_group: str, kwargs: dict, original_exception: Exception
):
pass
async def log_failure_fallback_event(self, original_model_group: str, kwargs: dict):
async def log_failure_fallback_event(
self, original_model_group: str, kwargs: dict, original_exception: Exception
):
pass
#### ADAPTERS #### Allow calling 100+ LLMs in custom format - https://github.com/BerriAI/litellm/pulls

View file

@ -80,6 +80,10 @@ class PrometheusLogger(CustomLogger):
"Total latency (seconds) for a request to LiteLLM",
labelnames=[
"model",
"hashed_api_key",
"api_key_alias",
"team",
"team_alias",
],
)
@ -88,6 +92,10 @@ class PrometheusLogger(CustomLogger):
"Total latency (seconds) for a models LLM API call",
labelnames=[
"model",
"hashed_api_key",
"api_key_alias",
"team",
"team_alias",
],
)
@ -216,6 +224,12 @@ class PrometheusLogger(CustomLogger):
"api_base",
"api_provider",
]
team_and_key_labels = [
"hashed_api_key",
"api_key_alias",
"team",
"team_alias",
]
# Metric for deployment state
self.litellm_deployment_state = Gauge(
@ -233,35 +247,42 @@ class PrometheusLogger(CustomLogger):
self.litellm_deployment_success_responses = Counter(
name="litellm_deployment_success_responses",
documentation="LLM Deployment Analytics - Total number of successful LLM API calls via litellm",
labelnames=[REQUESTED_MODEL] + _logged_llm_labels,
labelnames=[REQUESTED_MODEL] + _logged_llm_labels + team_and_key_labels,
)
self.litellm_deployment_failure_responses = Counter(
name="litellm_deployment_failure_responses",
documentation="LLM Deployment Analytics - Total number of failed LLM API calls for a specific LLM deploymeny. exception_status is the status of the exception from the llm api",
labelnames=[REQUESTED_MODEL] + _logged_llm_labels + EXCEPTION_LABELS,
labelnames=[REQUESTED_MODEL]
+ _logged_llm_labels
+ EXCEPTION_LABELS
+ team_and_key_labels,
)
self.litellm_deployment_total_requests = Counter(
name="litellm_deployment_total_requests",
documentation="LLM Deployment Analytics - Total number of LLM API calls via litellm - success + failure",
labelnames=[REQUESTED_MODEL] + _logged_llm_labels,
labelnames=[REQUESTED_MODEL] + _logged_llm_labels + team_and_key_labels,
)
# Deployment Latency tracking
self.litellm_deployment_latency_per_output_token = Histogram(
name="litellm_deployment_latency_per_output_token",
documentation="LLM Deployment Analytics - Latency per output token",
labelnames=_logged_llm_labels,
labelnames=_logged_llm_labels + team_and_key_labels,
)
self.litellm_deployment_successful_fallbacks = Counter(
"litellm_deployment_successful_fallbacks",
"LLM Deployment Analytics - Number of successful fallback requests from primary model -> fallback model",
["primary_model", "fallback_model"],
[REQUESTED_MODEL, "fallback_model"]
+ team_and_key_labels
+ EXCEPTION_LABELS,
)
self.litellm_deployment_failed_fallbacks = Counter(
"litellm_deployment_failed_fallbacks",
"LLM Deployment Analytics - Number of failed fallback requests from primary model -> fallback model",
["primary_model", "fallback_model"],
[REQUESTED_MODEL, "fallback_model"]
+ team_and_key_labels
+ EXCEPTION_LABELS,
)
self.litellm_llm_api_failed_requests_metric = Counter(
@ -448,14 +469,22 @@ class PrometheusLogger(CustomLogger):
kwargs.get("end_time") - api_call_start_time
)
api_call_total_time_seconds = api_call_total_time.total_seconds()
self.litellm_llm_api_latency_metric.labels(model).observe(
api_call_total_time_seconds
)
self.litellm_llm_api_latency_metric.labels(
model,
user_api_key,
user_api_key_alias,
user_api_team,
user_api_team_alias,
).observe(api_call_total_time_seconds)
# log metrics
self.litellm_request_total_latency_metric.labels(model).observe(
total_time_seconds
)
self.litellm_request_total_latency_metric.labels(
model,
user_api_key,
user_api_key_alias,
user_api_team,
user_api_team_alias,
).observe(total_time_seconds)
# set x-ratelimit headers
self.set_llm_deployment_success_metrics(
@ -579,6 +608,9 @@ class PrometheusLogger(CustomLogger):
def set_llm_deployment_failure_metrics(self, request_kwargs: dict):
try:
verbose_logger.debug("setting remaining tokens requests metric")
standard_logging_payload: StandardLoggingPayload = request_kwargs.get(
"standard_logging_object", {}
)
_response_headers = request_kwargs.get("response_headers")
_litellm_params = request_kwargs.get("litellm_params", {}) or {}
_metadata = _litellm_params.get("metadata", {})
@ -610,6 +642,16 @@ class PrometheusLogger(CustomLogger):
exception_status=str(getattr(exception, "status_code", None)),
exception_class=exception.__class__.__name__,
requested_model=model_group,
hashed_api_key=standard_logging_payload["metadata"][
"user_api_key_hash"
],
api_key_alias=standard_logging_payload["metadata"][
"user_api_key_alias"
],
team=standard_logging_payload["metadata"]["user_api_key_team_id"],
team_alias=standard_logging_payload["metadata"][
"user_api_key_team_alias"
],
).inc()
self.litellm_deployment_total_requests.labels(
@ -618,6 +660,16 @@ class PrometheusLogger(CustomLogger):
api_base=api_base,
api_provider=llm_provider,
requested_model=model_group,
hashed_api_key=standard_logging_payload["metadata"][
"user_api_key_hash"
],
api_key_alias=standard_logging_payload["metadata"][
"user_api_key_alias"
],
team=standard_logging_payload["metadata"]["user_api_key_team_id"],
team_alias=standard_logging_payload["metadata"][
"user_api_key_team_alias"
],
).inc()
pass
@ -706,6 +758,16 @@ class PrometheusLogger(CustomLogger):
api_base=api_base,
api_provider=llm_provider,
requested_model=model_group,
hashed_api_key=standard_logging_payload["metadata"][
"user_api_key_hash"
],
api_key_alias=standard_logging_payload["metadata"][
"user_api_key_alias"
],
team=standard_logging_payload["metadata"]["user_api_key_team_id"],
team_alias=standard_logging_payload["metadata"][
"user_api_key_team_alias"
],
).inc()
self.litellm_deployment_total_requests.labels(
@ -714,6 +776,16 @@ class PrometheusLogger(CustomLogger):
api_base=api_base,
api_provider=llm_provider,
requested_model=model_group,
hashed_api_key=standard_logging_payload["metadata"][
"user_api_key_hash"
],
api_key_alias=standard_logging_payload["metadata"][
"user_api_key_alias"
],
team=standard_logging_payload["metadata"]["user_api_key_team_id"],
team_alias=standard_logging_payload["metadata"][
"user_api_key_team_alias"
],
).inc()
# Track deployment Latency
@ -744,6 +816,16 @@ class PrometheusLogger(CustomLogger):
model_id=model_id,
api_base=api_base,
api_provider=llm_provider,
hashed_api_key=standard_logging_payload["metadata"][
"user_api_key_hash"
],
api_key_alias=standard_logging_payload["metadata"][
"user_api_key_alias"
],
team=standard_logging_payload["metadata"]["user_api_key_team_id"],
team_alias=standard_logging_payload["metadata"][
"user_api_key_team_alias"
],
).observe(latency_per_token)
except Exception as e:
@ -754,26 +836,70 @@ class PrometheusLogger(CustomLogger):
)
return
async def log_success_fallback_event(self, original_model_group: str, kwargs: dict):
async def log_success_fallback_event(
self, original_model_group: str, kwargs: dict, original_exception: Exception
):
"""
Logs a successful LLM fallback event on prometheus
"""
from litellm.litellm_core_utils.litellm_logging import (
StandardLoggingMetadata,
get_standard_logging_metadata,
)
verbose_logger.debug(
"Prometheus: log_success_fallback_event, original_model_group: %s, kwargs: %s",
original_model_group,
kwargs,
)
_metadata = kwargs.get("metadata", {})
standard_metadata: StandardLoggingMetadata = get_standard_logging_metadata(
metadata=_metadata
)
_new_model = kwargs.get("model")
self.litellm_deployment_successful_fallbacks.labels(
primary_model=original_model_group, fallback_model=_new_model
requested_model=original_model_group,
fallback_model=_new_model,
hashed_api_key=standard_metadata["user_api_key_hash"],
api_key_alias=standard_metadata["user_api_key_alias"],
team=standard_metadata["user_api_key_team_id"],
team_alias=standard_metadata["user_api_key_team_alias"],
exception_status=str(getattr(original_exception, "status_code", None)),
exception_class=str(original_exception.__class__.__name__),
).inc()
async def log_failure_fallback_event(self, original_model_group: str, kwargs: dict):
async def log_failure_fallback_event(
self, original_model_group: str, kwargs: dict, original_exception: Exception
):
"""
Logs a failed LLM fallback event on prometheus
"""
from litellm.litellm_core_utils.litellm_logging import (
StandardLoggingMetadata,
get_standard_logging_metadata,
)
verbose_logger.debug(
"Prometheus: log_failure_fallback_event, original_model_group: %s, kwargs: %s",
original_model_group,
kwargs,
)
_new_model = kwargs.get("model")
_metadata = kwargs.get("metadata", {})
standard_metadata: StandardLoggingMetadata = get_standard_logging_metadata(
metadata=_metadata
)
self.litellm_deployment_failed_fallbacks.labels(
primary_model=original_model_group, fallback_model=_new_model
requested_model=original_model_group,
fallback_model=_new_model,
hashed_api_key=standard_metadata["user_api_key_hash"],
api_key_alias=standard_metadata["user_api_key_alias"],
team=standard_metadata["user_api_key_team_id"],
team_alias=standard_metadata["user_api_key_team_alias"],
exception_status=str(getattr(original_exception, "status_code", None)),
exception_class=str(original_exception.__class__.__name__),
).inc()
def set_litellm_deployment_state(

View file

@ -2176,11 +2176,11 @@ def _init_custom_logger_compatible_class(
_in_memory_loggers.append(_langsmith_logger)
return _langsmith_logger # type: ignore
elif logging_integration == "prometheus":
if premium_user:
for callback in _in_memory_loggers:
if isinstance(callback, PrometheusLogger):
return callback # type: ignore
for callback in _in_memory_loggers:
if isinstance(callback, PrometheusLogger):
return callback # type: ignore
if premium_user:
_prometheus_logger = PrometheusLogger()
_in_memory_loggers.append(_prometheus_logger)
return _prometheus_logger # type: ignore
@ -2476,31 +2476,7 @@ def get_standard_logging_object_payload(
}
)
# clean up litellm metadata
clean_metadata = StandardLoggingMetadata(
user_api_key_hash=None,
user_api_key_alias=None,
user_api_key_team_id=None,
user_api_key_user_id=None,
user_api_key_team_alias=None,
spend_logs_metadata=None,
requester_ip_address=None,
requester_metadata=None,
)
if isinstance(metadata, dict):
# Filter the metadata dictionary to include only the specified keys
clean_metadata = StandardLoggingMetadata(
**{ # type: ignore
key: metadata[key]
for key in StandardLoggingMetadata.__annotations__.keys()
if key in metadata
}
)
if metadata.get("user_api_key") is not None:
if is_valid_sha256_hash(str(metadata.get("user_api_key"))):
clean_metadata["user_api_key_hash"] = metadata.get(
"user_api_key"
) # this is the hash
clean_metadata = get_standard_logging_metadata(metadata=metadata)
if litellm.cache is not None:
cache_key = litellm.cache.get_cache_key(**kwargs)
@ -2610,6 +2586,51 @@ def get_standard_logging_object_payload(
return None
def get_standard_logging_metadata(
metadata: Optional[Dict[str, Any]]
) -> StandardLoggingMetadata:
"""
Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata.
Args:
metadata (Optional[Dict[str, Any]]): The original metadata dictionary.
Returns:
StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata.
Note:
- If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned.
- If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'.
"""
# Initialize with default values
clean_metadata = StandardLoggingMetadata(
user_api_key_hash=None,
user_api_key_alias=None,
user_api_key_team_id=None,
user_api_key_user_id=None,
user_api_key_team_alias=None,
spend_logs_metadata=None,
requester_ip_address=None,
requester_metadata=None,
)
if isinstance(metadata, dict):
# Filter the metadata dictionary to include only the specified keys
clean_metadata = StandardLoggingMetadata(
**{ # type: ignore
key: metadata[key]
for key in StandardLoggingMetadata.__annotations__.keys()
if key in metadata
}
)
if metadata.get("user_api_key") is not None:
if is_valid_sha256_hash(str(metadata.get("user_api_key"))):
clean_metadata["user_api_key_hash"] = metadata.get(
"user_api_key"
) # this is the hash
return clean_metadata
def scrub_sensitive_keys_in_metadata(litellm_params: Optional[dict]):
if litellm_params is None:
litellm_params = {}

View file

@ -1,8 +1,9 @@
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import litellm
from litellm._logging import verbose_router_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.main import verbose_logger
if TYPE_CHECKING:
from litellm.router import Router as _Router
@ -41,13 +42,17 @@ async def run_async_fallback(
verbose_router_logger.info("Successful fallback b/w models.")
# callback for successfull_fallback_event():
await log_success_fallback_event(
original_model_group=original_model_group, kwargs=kwargs
original_model_group=original_model_group,
kwargs=kwargs,
original_exception=original_exception,
)
return response
except Exception as e:
error_from_fallbacks = e
await log_failure_fallback_event(
original_model_group=original_model_group, kwargs=kwargs
original_model_group=original_model_group,
kwargs=kwargs,
original_exception=original_exception,
)
raise error_from_fallbacks
@ -83,29 +88,115 @@ def run_sync_fallback(
raise error_from_fallbacks
async def log_success_fallback_event(original_model_group: str, kwargs: dict):
async def log_success_fallback_event(
original_model_group: str, kwargs: dict, original_exception: Exception
):
"""
Log a successful fallback event to all registered callbacks.
This function iterates through all callbacks, initializing _known_custom_logger_compatible_callbacks if needed,
and calls the log_success_fallback_event method on CustomLogger instances.
Args:
original_model_group (str): The original model group before fallback.
kwargs (dict): kwargs for the request
Note:
Errors during logging are caught and reported but do not interrupt the process.
"""
from litellm.litellm_core_utils.litellm_logging import (
_init_custom_logger_compatible_class,
)
for _callback in litellm.callbacks:
if isinstance(_callback, CustomLogger):
if isinstance(_callback, CustomLogger) or (
_callback in litellm._known_custom_logger_compatible_callbacks
):
try:
await _callback.log_success_fallback_event(
original_model_group=original_model_group, kwargs=kwargs
_callback_custom_logger: Optional[CustomLogger] = None
if _callback in litellm._known_custom_logger_compatible_callbacks:
_callback_custom_logger = _init_custom_logger_compatible_class(
logging_integration=_callback, # type: ignore
llm_router=None,
internal_usage_cache=None,
)
elif isinstance(_callback, CustomLogger):
_callback_custom_logger = _callback
else:
verbose_router_logger.exception(
f"{_callback} logger not found / initialized properly"
)
continue
if _callback_custom_logger is None:
verbose_router_logger.exception(
f"{_callback} logger not found / initialized properly, callback is None"
)
continue
await _callback_custom_logger.log_success_fallback_event(
original_model_group=original_model_group,
kwargs=kwargs,
original_exception=original_exception,
)
except Exception as e:
verbose_router_logger.error(
f"Error in log_success_fallback_event: {(str(e))}"
f"Error in log_success_fallback_event: {str(e)}"
)
pass
async def log_failure_fallback_event(original_model_group: str, kwargs: dict):
async def log_failure_fallback_event(
original_model_group: str, kwargs: dict, original_exception: Exception
):
"""
Log a failed fallback event to all registered callbacks.
This function iterates through all callbacks, initializing _known_custom_logger_compatible_callbacks if needed,
and calls the log_failure_fallback_event method on CustomLogger instances.
Args:
original_model_group (str): The original model group before fallback.
kwargs (dict): kwargs for the request
Note:
Errors during logging are caught and reported but do not interrupt the process.
"""
from litellm.litellm_core_utils.litellm_logging import (
_init_custom_logger_compatible_class,
)
for _callback in litellm.callbacks:
if isinstance(_callback, CustomLogger):
if isinstance(_callback, CustomLogger) or (
_callback in litellm._known_custom_logger_compatible_callbacks
):
try:
await _callback.log_failure_fallback_event(
original_model_group=original_model_group, kwargs=kwargs
_callback_custom_logger: Optional[CustomLogger] = None
if _callback in litellm._known_custom_logger_compatible_callbacks:
_callback_custom_logger = _init_custom_logger_compatible_class(
logging_integration=_callback, # type: ignore
llm_router=None,
internal_usage_cache=None,
)
elif isinstance(_callback, CustomLogger):
_callback_custom_logger = _callback
else:
verbose_router_logger.exception(
f"{_callback} logger not found / initialized properly"
)
continue
if _callback_custom_logger is None:
verbose_router_logger.exception(
f"{_callback} logger not found / initialized properly"
)
continue
await _callback_custom_logger.log_failure_fallback_event(
original_model_group=original_model_group,
kwargs=kwargs,
original_exception=original_exception,
)
except Exception as e:
verbose_router_logger.error(
f"Error in log_failure_fallback_event: {(str(e))}"
f"Error in log_failure_fallback_event: {str(e)}"
)
pass

View file

@ -5,6 +5,7 @@ Unit tests for prometheus metrics
import pytest
import aiohttp
import asyncio
import uuid
async def make_bad_chat_completion_request(session, key):
@ -23,6 +24,53 @@ async def make_bad_chat_completion_request(session, key):
return status, response_text
async def make_good_chat_completion_request(session, key):
url = "http://0.0.0.0:4000/chat/completions"
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}
data = {
"model": "fake-openai-endpoint",
"messages": [{"role": "user", "content": f"Hello {uuid.uuid4()}"}],
"tags": ["teamB"],
}
async with session.post(url, headers=headers, json=data) as response:
status = response.status
response_text = await response.text()
return status, response_text
async def make_chat_completion_request_with_fallback(session, key):
url = "http://0.0.0.0:4000/chat/completions"
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}
data = {
"model": "fake-azure-endpoint",
"messages": [{"role": "user", "content": "Hello"}],
"fallbacks": ["fake-openai-endpoint"],
}
async with session.post(url, headers=headers, json=data) as response:
status = response.status
response_text = await response.text()
# make a request with a failed fallback
data = {
"model": "fake-azure-endpoint",
"messages": [{"role": "user", "content": "Hello"}],
"fallbacks": ["unknown-model"],
}
async with session.post(url, headers=headers, json=data) as response:
status = response.status
response_text = await response.text()
return
@pytest.mark.asyncio
async def test_proxy_failure_metrics():
"""
@ -59,3 +107,77 @@ async def test_proxy_failure_metrics():
'litellm_proxy_total_requests_metric_total{api_key_alias="None",end_user="None",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",requested_model="fake-azure-endpoint",team="None",team_alias="None",user="default_user_id"} 1.0'
in metrics
)
assert (
'litellm_deployment_failure_responses_total{api_base="https://exampleopenaiendpoint-production.up.railway.app",api_key_alias="None",api_provider="openai",exception_class="RateLimitError",exception_status="429",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",litellm_model_name="429",model_id="7499d31f98cd518cf54486d5a00deda6894239ce16d13543398dc8abf870b15f",requested_model="fake-azure-endpoint",team="None",team_alias="None"}'
in metrics
)
@pytest.mark.asyncio
async def test_proxy_success_metrics():
"""
Make 1 good /chat/completions call to "openai/gpt-3.5-turbo"
GET /metrics
Assert the success metric is incremented by 1
"""
async with aiohttp.ClientSession() as session:
# Make a good chat completion call
status, response_text = await make_good_chat_completion_request(
session, "sk-1234"
)
# Check if the request succeeded as expected
assert status == 200, f"Expected status 200, but got {status}"
# Get metrics
async with session.get("http://0.0.0.0:4000/metrics") as response:
metrics = await response.text()
print("/metrics", metrics)
# Check if the success metric is present and correct
assert (
'litellm_request_total_latency_metric_bucket{api_key_alias="None",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",le="0.005",model="fake",team="None",team_alias="None"}'
in metrics
)
assert (
'litellm_llm_api_latency_metric_bucket{api_key_alias="None",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",le="0.005",model="fake",team="None",team_alias="None"}'
in metrics
)
assert (
'litellm_deployment_latency_per_output_token_count{api_base="https://exampleopenaiendpoint-production.up.railway.app/",api_key_alias="None",api_provider="openai",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",litellm_model_name="fake",model_id="team-b-model",team="None",team_alias="None"}'
in metrics
)
@pytest.mark.asyncio
async def test_proxy_fallback_metrics():
"""
Make 1 request with a client side fallback - check metrics
"""
async with aiohttp.ClientSession() as session:
# Make a good chat completion call
await make_chat_completion_request_with_fallback(session, "sk-1234")
# Get metrics
async with session.get("http://0.0.0.0:4000/metrics") as response:
metrics = await response.text()
print("/metrics", metrics)
# Check if successful fallback metric is incremented
assert (
'litellm_deployment_successful_fallbacks_total{api_key_alias="None",exception_class="RateLimitError",exception_status="429",fallback_model="fake-openai-endpoint",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",requested_model="fake-azure-endpoint",team="None",team_alias="None"} 1.0'
in metrics
)
# Check if failed fallback metric is incremented
assert (
'litellm_deployment_failed_fallbacks_total{api_key_alias="None",exception_class="RateLimitError",exception_status="429",fallback_model="unknown-model",hashed_api_key="88dc28d0f030c55ed4ab77ed8faf098196cb1c05df778539800c9f1243fe6b4b",requested_model="fake-azure-endpoint",team="None",team_alias="None"} 1.0'
in metrics
)