[Feat] Emit Key, Team Budget metrics on a cron job schedule (#9528)

* _initialize_remaining_budget_metrics

* initialize_budget_metrics_cron_job

* initialize_budget_metrics_cron_job

* initialize_budget_metrics_cron_job

* test_initialize_budget_metrics_cron_job

* LITELLM_PROXY_ADMIN_NAME

* fix code qa checks

* test_initialize_budget_metrics_cron_job

* test_initialize_budget_metrics_cron_job

* pod lock manager allow dynamic cron job ID

* fix pod lock manager

* require cronjobid for PodLockManager

* fix DB_SPEND_UPDATE_JOB_NAME acquire / release lock

* add comment on prometheus logger

* add debug statements for emitting key, team budget metrics

* test_pod_lock_manager.py

* test_initialize_budget_metrics_cron_job

* initialize_budget_metrics_cron_job

* initialize_remaining_budget_metrics

* remove outdated test
This commit is contained in:
Ishaan Jaff 2025-04-10 16:59:14 -07:00 committed by GitHub
parent 90d862b041
commit 94a553dbb2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 346 additions and 142 deletions

View file

@ -1,10 +1,19 @@
# used for /metrics endpoint on LiteLLM Proxy
#### What this does ####
# On success, log events to Prometheus
import asyncio
import sys
from datetime import datetime, timedelta
from typing import Any, Awaitable, Callable, List, Literal, Optional, Tuple, cast
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
List,
Literal,
Optional,
Tuple,
cast,
)
import litellm
from litellm._logging import print_verbose, verbose_logger
@ -14,6 +23,11 @@ from litellm.types.integrations.prometheus import *
from litellm.types.utils import StandardLoggingPayload
from litellm.utils import get_end_user_id_for_cost_tracking
if TYPE_CHECKING:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
else:
AsyncIOScheduler = Any
class PrometheusLogger(CustomLogger):
# Class variables or attributes
@ -359,8 +373,6 @@ class PrometheusLogger(CustomLogger):
label_name="litellm_requests_metric"
),
)
self._initialize_prometheus_startup_metrics()
except Exception as e:
print_verbose(f"Got exception on init prometheus client {str(e)}")
raise e
@ -988,9 +1000,9 @@ class PrometheusLogger(CustomLogger):
):
try:
verbose_logger.debug("setting remaining tokens requests metric")
standard_logging_payload: Optional[
StandardLoggingPayload
] = request_kwargs.get("standard_logging_object")
standard_logging_payload: Optional[StandardLoggingPayload] = (
request_kwargs.get("standard_logging_object")
)
if standard_logging_payload is None:
return
@ -1337,24 +1349,6 @@ class PrometheusLogger(CustomLogger):
return max_budget - spend
def _initialize_prometheus_startup_metrics(self):
"""
Initialize prometheus startup metrics
Helper to create tasks for initializing metrics that are required on startup - eg. remaining budget metrics
"""
if litellm.prometheus_initialize_budget_metrics is not True:
verbose_logger.debug("Prometheus: skipping budget metrics initialization")
return
try:
if asyncio.get_running_loop():
asyncio.create_task(self._initialize_remaining_budget_metrics())
except RuntimeError as e: # no running event loop
verbose_logger.exception(
f"No running event loop - skipping budget metrics initialization: {str(e)}"
)
async def _initialize_budget_metrics(
self,
data_fetch_function: Callable[..., Awaitable[Tuple[List[Any], Optional[int]]]],
@ -1475,12 +1469,41 @@ class PrometheusLogger(CustomLogger):
data_type="keys",
)
async def _initialize_remaining_budget_metrics(self):
async def initialize_remaining_budget_metrics(self):
"""
Initialize remaining budget metrics for all teams to avoid metric discrepancies.
Handler for initializing remaining budget metrics for all teams to avoid metric discrepancies.
Runs when prometheus logger starts up.
- If redis cache is available, we use the pod lock manager to acquire a lock and initialize the metrics.
- Ensures only one pod emits the metrics at a time.
- If redis cache is not available, we initialize the metrics directly.
"""
from litellm.constants import PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME
from litellm.proxy.proxy_server import proxy_logging_obj
pod_lock_manager = proxy_logging_obj.db_spend_update_writer.pod_lock_manager
# if using redis, ensure only one pod emits the metrics at a time
if pod_lock_manager and pod_lock_manager.redis_cache:
if await pod_lock_manager.acquire_lock(
cronjob_id=PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME
):
try:
await self._initialize_remaining_budget_metrics()
finally:
await pod_lock_manager.release_lock(
cronjob_id=PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME
)
else:
# if not using redis, initialize the metrics directly
await self._initialize_remaining_budget_metrics()
async def _initialize_remaining_budget_metrics(self):
"""
Helper to initialize remaining budget metrics for all teams and API keys.
"""
verbose_logger.debug("Emitting key, team budget metrics....")
await self._initialize_team_budget_metrics()
await self._initialize_api_key_budget_metrics()
@ -1737,6 +1760,36 @@ class PrometheusLogger(CustomLogger):
return (end_time - start_time).total_seconds()
return None
@staticmethod
def initialize_budget_metrics_cron_job(scheduler: AsyncIOScheduler):
"""
Initialize budget metrics as a cron job. This job runs every `PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES` minutes.
It emits the current remaining budget metrics for all Keys and Teams.
"""
from litellm.constants import PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.prometheus import PrometheusLogger
prometheus_loggers: List[CustomLogger] = (
litellm.logging_callback_manager.get_custom_loggers_for_type(
callback_type=PrometheusLogger
)
)
# we need to get the initialized prometheus logger instance(s) and call logger.initialize_remaining_budget_metrics() on them
verbose_logger.debug("found %s prometheus loggers", len(prometheus_loggers))
if len(prometheus_loggers) > 0:
prometheus_logger = cast(PrometheusLogger, prometheus_loggers[0])
verbose_logger.debug(
"Initializing remaining budget metrics as a cron job executing every %s minutes"
% PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES
)
scheduler.add_job(
prometheus_logger.initialize_remaining_budget_metrics,
"interval",
minutes=PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES,
)
@staticmethod
def _mount_metrics_endpoint(premium_user: bool):
"""