# used for /metrics endpoint on LiteLLM Proxy #### What this does #### # On success, log events to Prometheus import os import subprocess import sys import traceback import uuid from datetime import datetime, timedelta from typing import Optional, TypedDict, Union import dotenv import requests # type: ignore import litellm from litellm._logging import print_verbose, verbose_logger from litellm.integrations.custom_logger import CustomLogger class PrometheusLogger(CustomLogger): # Class variables or attributes def __init__( self, **kwargs, ): try: from prometheus_client import Counter, Gauge, Histogram from litellm.proxy.proxy_server import premium_user verbose_logger.warning( "🚨🚨🚨 Prometheus Metrics will be moving to LiteLLM Enterprise on September 15th, 2024.\n🚨 Contact us here to get a license https://calendly.com/d/4mp-gd3-k5k/litellm-1-1-onboarding-chat \n🚨 Enterprise Pricing: https://www.litellm.ai/#pricing" ) self.litellm_llm_api_failed_requests_metric = Counter( name="litellm_llm_api_failed_requests_metric", documentation="Total number of failed LLM API calls via litellm - track fails per API Key, team, user", labelnames=[ "end_user", "hashed_api_key", "api_key_alias", "model", "team", "team_alias", "user", ], ) self.litellm_requests_metric = Counter( name="litellm_requests_metric", documentation="Total number of LLM calls to litellm - track total per API Key, team, user", labelnames=[ "end_user", "hashed_api_key", "api_key_alias", "model", "team", "team_alias", "user", ], ) # Counter for spend self.litellm_spend_metric = Counter( "litellm_spend_metric", "Total spend on LLM requests", labelnames=[ "end_user", "hashed_api_key", "api_key_alias", "model", "team", "team_alias", "user", ], ) # Counter for total_output_tokens self.litellm_tokens_metric = Counter( "litellm_total_tokens", "Total number of input + output tokens from LLM requests", labelnames=[ "end_user", "hashed_api_key", "api_key_alias", "model", "team", "team_alias", "user", ], ) # Remaining Budget for Team self.litellm_remaining_team_budget_metric = Gauge( "litellm_remaining_team_budget_metric", "Remaining budget for team", labelnames=["team_id", "team_alias"], ) # Remaining Budget for API Key self.litellm_remaining_api_key_budget_metric = Gauge( "litellm_remaining_api_key_budget_metric", "Remaining budget for api key", labelnames=["hashed_api_key", "api_key_alias"], ) ######################################## # LLM API Deployment Metrics / analytics ######################################## # Litellm-Enterprise Metrics if premium_user is True: # Remaining Rate Limit for model self.litellm_remaining_requests_metric = Gauge( "litellm_remaining_requests", "LLM Deployment Analytics - remaining requests for model, returned from LLM API Provider", labelnames=[ "model_group", "api_provider", "api_base", "litellm_model_name", ], ) self.litellm_remaining_tokens_metric = Gauge( "litellm_remaining_tokens", "remaining tokens for model, returned from LLM API Provider", labelnames=[ "model_group", "api_provider", "api_base", "litellm_model_name", ], ) # Get all keys _logged_llm_labels = [ "litellm_model_name", "model_id", "api_base", "api_provider", ] # Metric for deployment state self.litellm_deployment_state = Gauge( "litellm_deployment_state", "LLM Deployment Analytics - The state of the deployment: 0 = healthy, 1 = partial outage, 2 = complete outage", labelnames=_logged_llm_labels, ) self.litellm_deployment_success_responses = Counter( name="litellm_deployment_success_responses", documentation="LLM Deployment Analytics - Total number of successful LLM API calls via litellm", labelnames=_logged_llm_labels, ) self.litellm_deployment_failure_responses = Counter( name="litellm_deployment_failure_responses", documentation="LLM Deployment Analytics - Total number of failed LLM API calls via litellm", labelnames=_logged_llm_labels, ) self.litellm_deployment_total_requests = Counter( name="litellm_deployment_total_requests", documentation="LLM Deployment Analytics - Total number of LLM API calls via litellm - success + failure", labelnames=_logged_llm_labels, ) # Deployment Latency tracking self.litellm_deployment_latency_per_output_token = Histogram( name="litellm_deployment_latency_per_output_token", documentation="LLM Deployment Analytics - Latency per output token", labelnames=_logged_llm_labels, ) self.litellm_deployment_successful_fallbacks = Counter( "litellm_deployment_successful_fallbacks", "LLM Deployment Analytics - Number of successful fallback requests from primary model -> fallback model", ["primary_model", "fallback_model"], ) self.litellm_deployment_failed_fallbacks = Counter( "litellm_deployment_failed_fallbacks", "LLM Deployment Analytics - Number of failed fallback requests from primary model -> fallback model", ["primary_model", "fallback_model"], ) except Exception as e: print_verbose(f"Got exception on init prometheus client {str(e)}") raise e async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): # Define prometheus client from litellm.proxy.proxy_server import premium_user verbose_logger.debug( f"prometheus Logging - Enters success logging function for kwargs {kwargs}" ) # unpack kwargs model = kwargs.get("model", "") response_cost = kwargs.get("response_cost", 0.0) or 0 litellm_params = kwargs.get("litellm_params", {}) or {} proxy_server_request = litellm_params.get("proxy_server_request") or {} end_user_id = proxy_server_request.get("body", {}).get("user", None) user_id = litellm_params.get("metadata", {}).get("user_api_key_user_id", None) user_api_key = litellm_params.get("metadata", {}).get("user_api_key", None) user_api_key_alias = litellm_params.get("metadata", {}).get( "user_api_key_alias", None ) user_api_team = litellm_params.get("metadata", {}).get( "user_api_key_team_id", None ) user_api_team_alias = litellm_params.get("metadata", {}).get( "user_api_key_team_alias", None ) _team_spend = litellm_params.get("metadata", {}).get( "user_api_key_team_spend", None ) _team_max_budget = litellm_params.get("metadata", {}).get( "user_api_key_team_max_budget", None ) _remaining_team_budget = safe_get_remaining_budget( max_budget=_team_max_budget, spend=_team_spend ) _api_key_spend = litellm_params.get("metadata", {}).get( "user_api_key_spend", None ) _api_key_max_budget = litellm_params.get("metadata", {}).get( "user_api_key_max_budget", None ) _remaining_api_key_budget = safe_get_remaining_budget( max_budget=_api_key_max_budget, spend=_api_key_spend ) output_tokens = 1.0 if response_obj is not None: tokens_used = response_obj.get("usage", {}).get("total_tokens", 0) output_tokens = response_obj.get("usage", {}).get("completion_tokens", 0) else: tokens_used = 0 print_verbose( f"inside track_prometheus_metrics, model {model}, response_cost {response_cost}, tokens_used {tokens_used}, end_user_id {end_user_id}, user_api_key {user_api_key}" ) if ( user_api_key is not None and isinstance(user_api_key, str) and user_api_key.startswith("sk-") ): from litellm.proxy.utils import hash_token user_api_key = hash_token(user_api_key) self.litellm_requests_metric.labels( end_user_id, user_api_key, user_api_key_alias, model, user_api_team, user_api_team_alias, user_id, ).inc() self.litellm_spend_metric.labels( end_user_id, user_api_key, user_api_key_alias, model, user_api_team, user_api_team_alias, user_id, ).inc(response_cost) self.litellm_tokens_metric.labels( end_user_id, user_api_key, user_api_key_alias, model, user_api_team, user_api_team_alias, user_id, ).inc(tokens_used) self.litellm_remaining_team_budget_metric.labels( user_api_team, user_api_team_alias ).set(_remaining_team_budget) self.litellm_remaining_api_key_budget_metric.labels( user_api_key, user_api_key_alias ).set(_remaining_api_key_budget) # set x-ratelimit headers if premium_user is True: self.set_llm_deployment_success_metrics( kwargs, start_time, end_time, output_tokens ) pass async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): from litellm.proxy.proxy_server import premium_user verbose_logger.debug( f"prometheus Logging - Enters success logging function for kwargs {kwargs}" ) # unpack kwargs model = kwargs.get("model", "") litellm_params = kwargs.get("litellm_params", {}) or {} proxy_server_request = litellm_params.get("proxy_server_request") or {} end_user_id = proxy_server_request.get("body", {}).get("user", None) user_id = litellm_params.get("metadata", {}).get("user_api_key_user_id", None) user_api_key = litellm_params.get("metadata", {}).get("user_api_key", None) user_api_key_alias = litellm_params.get("metadata", {}).get( "user_api_key_alias", None ) user_api_team = litellm_params.get("metadata", {}).get( "user_api_key_team_id", None ) user_api_team_alias = litellm_params.get("metadata", {}).get( "user_api_key_team_alias", None ) try: self.litellm_llm_api_failed_requests_metric.labels( end_user_id, user_api_key, user_api_key_alias, model, user_api_team, user_api_team_alias, user_id, ).inc() self.set_llm_deployment_failure_metrics(kwargs) except Exception as e: verbose_logger.exception( "prometheus Layer Error(): Exception occured - {}".format(str(e)) ) pass pass def set_llm_deployment_failure_metrics(self, request_kwargs: dict): try: verbose_logger.debug("setting remaining tokens requests metric") _response_headers = request_kwargs.get("response_headers") _litellm_params = request_kwargs.get("litellm_params", {}) or {} _metadata = _litellm_params.get("metadata", {}) litellm_model_name = request_kwargs.get("model", None) api_base = _metadata.get("api_base", None) llm_provider = _litellm_params.get("custom_llm_provider", None) model_id = _metadata.get("model_id") """ log these labels ["litellm_model_name", "model_id", "api_base", "api_provider"] """ self.set_deployment_partial_outage( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ) self.litellm_deployment_failure_responses.labels( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ).inc() self.litellm_deployment_total_requests.labels( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ).inc() pass except: pass def set_llm_deployment_success_metrics( self, request_kwargs: dict, start_time, end_time, output_tokens: float = 1.0, ): try: verbose_logger.debug("setting remaining tokens requests metric") _response_headers = request_kwargs.get("response_headers") _litellm_params = request_kwargs.get("litellm_params", {}) or {} _metadata = _litellm_params.get("metadata", {}) litellm_model_name = request_kwargs.get("model", None) model_group = _metadata.get("model_group", None) api_base = _metadata.get("api_base", None) llm_provider = _litellm_params.get("custom_llm_provider", None) model_id = _metadata.get("model_id") remaining_requests = None remaining_tokens = None # OpenAI / OpenAI Compatible headers if ( _response_headers and "x-ratelimit-remaining-requests" in _response_headers ): remaining_requests = _response_headers["x-ratelimit-remaining-requests"] if ( _response_headers and "x-ratelimit-remaining-tokens" in _response_headers ): remaining_tokens = _response_headers["x-ratelimit-remaining-tokens"] verbose_logger.debug( f"remaining requests: {remaining_requests}, remaining tokens: {remaining_tokens}" ) if remaining_requests: """ "model_group", "api_provider", "api_base", "litellm_model_name" """ self.litellm_remaining_requests_metric.labels( model_group, llm_provider, api_base, litellm_model_name ).set(remaining_requests) if remaining_tokens: self.litellm_remaining_tokens_metric.labels( model_group, llm_provider, api_base, litellm_model_name ).set(remaining_tokens) """ log these labels ["litellm_model_name", "model_id", "api_base", "api_provider"] """ self.set_deployment_healthy( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ) self.litellm_deployment_success_responses.labels( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ).inc() self.litellm_deployment_total_requests.labels( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ).inc() # Track deployment Latency response_ms: timedelta = end_time - start_time time_to_first_token_response_time: Optional[timedelta] = None if ( request_kwargs.get("stream", None) is not None and request_kwargs["stream"] == True ): # only log ttft for streaming request time_to_first_token_response_time = ( request_kwargs.get("completion_start_time", end_time) - start_time ) # use the metric that is not None # if streaming - use time_to_first_token_response # if not streaming - use response_ms _latency: timedelta = time_to_first_token_response_time or response_ms _latency_seconds = _latency.total_seconds() # latency per output token latency_per_token = None if output_tokens is not None and output_tokens > 0: latency_per_token = _latency_seconds / output_tokens self.litellm_deployment_latency_per_output_token.labels( litellm_model_name=litellm_model_name, model_id=model_id, api_base=api_base, api_provider=llm_provider, ).observe(latency_per_token) except Exception as e: verbose_logger.error( "Prometheus Error: set_llm_deployment_success_metrics. Exception occured - {}".format( str(e) ) ) return async def log_success_fallback_event(self, original_model_group: str, kwargs: dict): verbose_logger.debug( "Prometheus: log_success_fallback_event, original_model_group: %s, kwargs: %s", original_model_group, kwargs, ) _new_model = kwargs.get("model") self.litellm_deployment_successful_fallbacks.labels( primary_model=original_model_group, fallback_model=_new_model ).inc() async def log_failure_fallback_event(self, original_model_group: str, kwargs: dict): verbose_logger.debug( "Prometheus: log_failure_fallback_event, original_model_group: %s, kwargs: %s", original_model_group, kwargs, ) _new_model = kwargs.get("model") self.litellm_deployment_failed_fallbacks.labels( primary_model=original_model_group, fallback_model=_new_model ).inc() def set_litellm_deployment_state( self, state: int, litellm_model_name: str, model_id: str, api_base: str, api_provider: str, ): self.litellm_deployment_state.labels( litellm_model_name, model_id, api_base, api_provider ).set(state) def set_deployment_healthy( self, litellm_model_name: str, model_id: str, api_base: str, api_provider: str, ): self.set_litellm_deployment_state( 0, litellm_model_name, model_id, api_base, api_provider ) def set_deployment_partial_outage( self, litellm_model_name: str, model_id: str, api_base: str, api_provider: str, ): self.set_litellm_deployment_state( 1, litellm_model_name, model_id, api_base, api_provider ) def set_deployment_complete_outage( self, litellm_model_name: str, model_id: str, api_base: str, api_provider: str, ): self.set_litellm_deployment_state( 2, litellm_model_name, model_id, api_base, api_provider ) def safe_get_remaining_budget( max_budget: Optional[float], spend: Optional[float] ) -> float: if max_budget is None: return float("inf") if spend is None: return max_budget return max_budget - spend