From e09ef4afc72a0a0e2bc34d24984bdd3835d10cc9 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 17:39:48 -0700 Subject: [PATCH 01/12] use service logger for tracking pod lock status --- litellm/types/services.py | 68 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/litellm/types/services.py b/litellm/types/services.py index 3eb283dbe9..05944772d7 100644 --- a/litellm/types/services.py +++ b/litellm/types/services.py @@ -1,8 +1,15 @@ import enum import uuid -from typing import Optional +from typing import List, Optional from pydantic import BaseModel, Field +from typing_extensions import TypedDict + + +class ServiceMetrics(enum.Enum): + COUNTER = "counter" + HISTOGRAM = "histogram" + GAUGE = "gauge" class ServiceTypes(str, enum.Enum): @@ -18,6 +25,62 @@ class ServiceTypes(str, enum.Enum): ROUTER = "router" AUTH = "auth" PROXY_PRE_CALL = "proxy_pre_call" + POD_LOCK_MANAGER = "pod_lock_manager" + + +class ServiceConfig(TypedDict): + """ + Configuration for services and their metrics + """ + + metrics: List[ServiceMetrics] # What metrics this service should support + + +""" +Metric types to use for each service + +- REDIS only needs Counter, Histogram +- Pod Lock Manager only needs a gauge metric +""" +DEFAULT_SERVICE_CONFIGS = { + ServiceTypes.REDIS.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.DB.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.BATCH_WRITE_TO_DB.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.RESET_BUDGET_JOB.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.LITELLM.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.ROUTER.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.AUTH.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.PROXY_PRE_CALL.value: { + "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] + }, + ServiceTypes.POD_LOCK_MANAGER.value: {"metrics": [ServiceMetrics.GAUGE]}, +} + + +class ServiceEventMetadata(TypedDict, total=False): + """ + The metadata logged during service success/failure + + Add any extra fields you expect to access in the service_success_hook/service_failure_hook + """ + + # Dynamically control gauge labels and values + gauge_labels: Optional[str] + gauge_value: Optional[float] class ServiceLoggerPayload(BaseModel): @@ -30,6 +93,9 @@ class ServiceLoggerPayload(BaseModel): service: ServiceTypes = Field(description="who is this for? - postgres/redis") duration: float = Field(description="How long did the request take?") call_type: str = Field(description="The call of the service, being made") + event_metadata: Optional[dict] = Field( + description="The metadata logged during service success/failure" + ) def to_json(self, **kwargs): try: From 73bbd0a4460e808ba1e385293ec23bad16114fe8 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 17:40:25 -0700 Subject: [PATCH 02/12] emit lock acquired and released events --- .../db_transaction_queue/pod_lock_manager.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py b/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py index 5b640033a0..3f63afe62a 100644 --- a/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py +++ b/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py @@ -1,15 +1,20 @@ +import asyncio import uuid from typing import TYPE_CHECKING, Any, Optional from litellm._logging import verbose_proxy_logger +from litellm._service_logger import ServiceLogging from litellm.caching.redis_cache import RedisCache from litellm.constants import DEFAULT_CRON_JOB_LOCK_TTL_SECONDS +from litellm.types.services import ServiceTypes if TYPE_CHECKING: ProxyLogging = Any else: ProxyLogging = Any +service_logger_obj = ServiceLogging() # used for tracking current pod lock status + class PodLockManager: """ @@ -57,6 +62,7 @@ class PodLockManager: self.pod_id, self.cronjob_id, ) + return True else: # Check if the current pod already holds the lock @@ -70,6 +76,7 @@ class PodLockManager: self.pod_id, self.cronjob_id, ) + self._emit_acquired_lock_event(self.cronjob_id, self.pod_id) return True return False except Exception as e: @@ -104,6 +111,7 @@ class PodLockManager: self.pod_id, self.cronjob_id, ) + self._emit_released_lock_event(self.cronjob_id, self.pod_id) else: verbose_proxy_logger.debug( "Pod %s failed to release Redis lock for cronjob_id=%s", @@ -127,3 +135,31 @@ class PodLockManager: verbose_proxy_logger.error( f"Error releasing Redis lock for {self.cronjob_id}: {e}" ) + + @staticmethod + def _emit_acquired_lock_event(cronjob_id: str, pod_id: str): + asyncio.create_task( + service_logger_obj.async_service_success_hook( + service=ServiceTypes.POD_LOCK_MANAGER, + duration=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS, + call_type="_emit_acquired_lock_event", + event_metadata={ + "gauge_labels": f"{cronjob_id}:{pod_id}", + "gauge_value": 1, + }, + ) + ) + + @staticmethod + def _emit_released_lock_event(cronjob_id: str, pod_id: str): + asyncio.create_task( + service_logger_obj.async_service_success_hook( + service=ServiceTypes.POD_LOCK_MANAGER, + duration=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS, + call_type="_emit_released_lock_event", + event_metadata={ + "gauge_labels": f"{cronjob_id}:{pod_id}", + "gauge_value": 0, + }, + ) + ) From 05b30e28db38a5e56e08bf8af9f7516fd22d9c61 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 17:50:41 -0700 Subject: [PATCH 03/12] clean up service metrics --- litellm/_service_logger.py | 2 + litellm/integrations/prometheus_services.py | 106 +++++++++++++++----- litellm/proxy/proxy_config.yaml | 4 +- 3 files changed, 87 insertions(+), 25 deletions(-) diff --git a/litellm/_service_logger.py b/litellm/_service_logger.py index 8f835bea83..7a60359d54 100644 --- a/litellm/_service_logger.py +++ b/litellm/_service_logger.py @@ -124,6 +124,7 @@ class ServiceLogging(CustomLogger): service=service, duration=duration, call_type=call_type, + event_metadata=event_metadata, ) for callback in litellm.service_callback: @@ -229,6 +230,7 @@ class ServiceLogging(CustomLogger): service=service, duration=duration, call_type=call_type, + event_metadata=event_metadata, ) for callback in litellm.service_callback: diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index 4bf293fb01..d14cbd7469 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -7,7 +7,12 @@ from typing import List, Optional, Union from litellm._logging import print_verbose, verbose_logger from litellm.types.integrations.prometheus import LATENCY_BUCKETS -from litellm.types.services import ServiceLoggerPayload, ServiceTypes +from litellm.types.services import ( + DEFAULT_SERVICE_CONFIGS, + ServiceLoggerPayload, + ServiceMetrics, + ServiceTypes, +) FAILED_REQUESTS_LABELS = ["error_class", "function_name"] @@ -23,7 +28,8 @@ class PrometheusServicesLogger: ): try: try: - from prometheus_client import REGISTRY, Counter, Histogram + from prometheus_client import REGISTRY, Counter, Gauge, Histogram + from prometheus_client.gc_collector import Collector except ImportError: raise Exception( "Missing prometheus_client. Run `pip install prometheus-client`" @@ -31,36 +37,52 @@ class PrometheusServicesLogger: self.Histogram = Histogram self.Counter = Counter + self.Gauge = Gauge self.REGISTRY = REGISTRY verbose_logger.debug("in init prometheus services metrics") - self.services = [item.value for item in ServiceTypes] - - self.payload_to_prometheus_map = ( - {} - ) # store the prometheus histogram/counter we need to call for each field in payload + self.services = [item for item in ServiceTypes] + self.payload_to_prometheus_map = {} for service in self.services: - histogram = self.create_histogram(service, type_of_request="latency") - counter_failed_request = self.create_counter( - service, - type_of_request="failed_requests", - additional_labels=FAILED_REQUESTS_LABELS, - ) - counter_total_requests = self.create_counter( - service, type_of_request="total_requests" - ) - self.payload_to_prometheus_map[service] = [ - histogram, - counter_failed_request, - counter_total_requests, - ] + service_metrics: List[Union[Histogram, Counter, Gauge, Collector]] = [] - self.prometheus_to_amount_map: dict = ( - {} - ) # the field / value in ServiceLoggerPayload the object needs to be incremented by + metrics_to_initialize = self._get_service_metrics_initialize(service) + # Initialize only the configured metrics for each service + if ServiceMetrics.HISTOGRAM in metrics_to_initialize: + histogram = self.create_histogram( + service, type_of_request="latency" + ) + if histogram: + service_metrics.append(histogram) + + if ServiceMetrics.COUNTER in metrics_to_initialize: + counter_failed_request = self.create_counter( + service, + type_of_request="failed_requests", + additional_labels=FAILED_REQUESTS_LABELS, + ) + if counter_failed_request: + service_metrics.append(counter_failed_request) + counter_total_requests = self.create_counter( + service, type_of_request="total_requests" + ) + if counter_total_requests: + service_metrics.append(counter_total_requests) + + if ServiceMetrics.GAUGE in metrics_to_initialize: + gauge = self.create_gauge( + service, type_of_request="pod_lock_manager" + ) + if gauge: + service_metrics.append(gauge) + + if service_metrics: + self.payload_to_prometheus_map[service] = service_metrics + + self.prometheus_to_amount_map: dict = {} ### MOCK TESTING ### self.mock_testing = mock_testing self.mock_testing_success_calls = 0 @@ -70,6 +92,17 @@ class PrometheusServicesLogger: print_verbose(f"Got exception on init prometheus client {str(e)}") raise e + def _get_service_metrics_initialize( + self, service: ServiceTypes + ) -> List[ServiceMetrics]: + if service not in DEFAULT_SERVICE_CONFIGS: + raise ValueError(f"Service {service} not found in DEFAULT_SERVICE_CONFIGS") + + metrics = DEFAULT_SERVICE_CONFIGS.get(service, {}).get("metrics", []) + if not metrics: + raise ValueError(f"No metrics found for service {service}") + return metrics + def is_metric_registered(self, metric_name) -> bool: for metric in self.REGISTRY.collect(): if metric_name == metric.name: @@ -94,6 +127,15 @@ class PrometheusServicesLogger: buckets=LATENCY_BUCKETS, ) + def create_gauge(self, service: str, type_of_request: str): + metric_name = "litellm_{}_{}".format(service, type_of_request) + is_registered = self.is_metric_registered(metric_name) + if is_registered: + return self._get_metric(metric_name) + return self.Gauge( + metric_name, "Gauge for {} service".format(service), labelnames=[service] + ) + def create_counter( self, service: str, @@ -120,6 +162,15 @@ class PrometheusServicesLogger: histogram.labels(labels).observe(amount) + def update_gauge( + self, + gauge, + labels: str, + amount: float, + ): + assert isinstance(gauge, self.Gauge) + gauge.labels(labels).set(amount) + def increment_counter( self, counter, @@ -190,6 +241,13 @@ class PrometheusServicesLogger: labels=payload.service.value, amount=1, # LOG TOTAL REQUESTS TO PROMETHEUS ) + elif isinstance(obj, self.Gauge): + if payload.event_metadata: + self.update_gauge( + gauge=obj, + labels=payload.event_metadata.get("gauge_labels") or "", + amount=payload.event_metadata.get("gauge_value") or 0, + ) async def async_service_failure_hook( self, diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index fe8d73d26a..2ee830bca4 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -12,4 +12,6 @@ litellm_settings: cache: True cache_params: type: redis - supported_call_types: [] \ No newline at end of file + supported_call_types: [] + callbacks: ["prometheus"] + service_callback: ["prometheus_system"] \ No newline at end of file From 3256b6af6c0432f5ce310623f61c5e57eec91a08 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 18:03:09 -0700 Subject: [PATCH 04/12] track service types on prom services --- litellm/types/services.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/litellm/types/services.py b/litellm/types/services.py index 05944772d7..e7b3c91ed3 100644 --- a/litellm/types/services.py +++ b/litellm/types/services.py @@ -27,6 +27,17 @@ class ServiceTypes(str, enum.Enum): PROXY_PRE_CALL = "proxy_pre_call" POD_LOCK_MANAGER = "pod_lock_manager" + """ + Operational metrics for DB Transaction Queues + """ + # daily spend update queue - actual transaction events + IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue" + REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue" + + # spend update queue - current spend of key, user, team + IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue" + REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue" + class ServiceConfig(TypedDict): """ From 7b768ed909a357193f8b8b74f622d6d6e62165be Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 18:38:33 -0700 Subject: [PATCH 05/12] doc fix sso login url --- docs/my-website/docs/proxy/admin_ui_sso.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/my-website/docs/proxy/admin_ui_sso.md b/docs/my-website/docs/proxy/admin_ui_sso.md index 882e3df0b2..0bbba57fd9 100644 --- a/docs/my-website/docs/proxy/admin_ui_sso.md +++ b/docs/my-website/docs/proxy/admin_ui_sso.md @@ -156,7 +156,7 @@ PROXY_LOGOUT_URL="https://www.google.com" Set this in your .env (so the proxy can set the correct redirect url) ```shell -PROXY_BASE_URL=https://litellm-api.up.railway.app/ +PROXY_BASE_URL=https://litellm-api.up.railway.app ``` #### Step 4. Test flow From 80fb4ece9770e26d33b11bfd63d8bc146591eb74 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 18:39:29 -0700 Subject: [PATCH 06/12] prom emit size of DB TX queues for observability --- litellm/integrations/prometheus_services.py | 10 +++--- .../db_transaction_queue/base_update_queue.py | 16 +++++++++ .../daily_spend_update_queue.py | 24 ++++++++++++-- .../db_transaction_queue/pod_lock_manager.py | 4 +-- .../redis_update_buffer.py | 33 +++++++++++++++++-- .../spend_update_queue.py | 24 ++++++++++++-- 6 files changed, 97 insertions(+), 14 deletions(-) diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index d14cbd7469..dddaa4d064 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -73,9 +73,7 @@ class PrometheusServicesLogger: service_metrics.append(counter_total_requests) if ServiceMetrics.GAUGE in metrics_to_initialize: - gauge = self.create_gauge( - service, type_of_request="pod_lock_manager" - ) + gauge = self.create_gauge(service, type_of_request="size") if gauge: service_metrics.append(gauge) @@ -95,12 +93,14 @@ class PrometheusServicesLogger: def _get_service_metrics_initialize( self, service: ServiceTypes ) -> List[ServiceMetrics]: + DEFAULT_METRICS = [ServiceMetrics.COUNTER, ServiceMetrics.GAUGE] if service not in DEFAULT_SERVICE_CONFIGS: - raise ValueError(f"Service {service} not found in DEFAULT_SERVICE_CONFIGS") + return DEFAULT_METRICS metrics = DEFAULT_SERVICE_CONFIGS.get(service, {}).get("metrics", []) if not metrics: - raise ValueError(f"No metrics found for service {service}") + verbose_logger.debug(f"No metrics found for service {service}") + return DEFAULT_METRICS return metrics def is_metric_registered(self, metric_name) -> bool: diff --git a/litellm/proxy/db/db_transaction_queue/base_update_queue.py b/litellm/proxy/db/db_transaction_queue/base_update_queue.py index b3c3c26c84..2bf2393127 100644 --- a/litellm/proxy/db/db_transaction_queue/base_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/base_update_queue.py @@ -2,8 +2,14 @@ Base class for in memory buffer for database transactions """ import asyncio +from typing import Optional from litellm._logging import verbose_proxy_logger +from litellm._service_logger import ServiceLogging + +service_logger_obj = ( + ServiceLogging() +) # used for tracking metrics for In memory buffer, redis buffer, pod lock manager class BaseUpdateQueue: @@ -16,6 +22,9 @@ class BaseUpdateQueue: """Enqueue an update.""" verbose_proxy_logger.debug("Adding update to queue: %s", update) await self.update_queue.put(update) + await self._emit_new_item_added_to_queue_event( + queue_size=self.update_queue.qsize() + ) async def flush_all_updates_from_in_memory_queue(self): """Get all updates from the queue.""" @@ -23,3 +32,10 @@ class BaseUpdateQueue: while not self.update_queue.empty(): updates.append(await self.update_queue.get()) return updates + + async def _emit_new_item_added_to_queue_event( + self, + queue_size: Optional[int] = None, + ): + """placeholder, emit event when a new item is added to the queue""" + pass diff --git a/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py b/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py index dedb8c8f8f..afae431370 100644 --- a/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/daily_spend_update_queue.py @@ -1,9 +1,13 @@ import asyncio -from typing import Dict, List +from typing import Dict, List, Optional from litellm._logging import verbose_proxy_logger from litellm.proxy._types import DailyUserSpendTransaction -from litellm.proxy.db.db_transaction_queue.base_update_queue import BaseUpdateQueue +from litellm.proxy.db.db_transaction_queue.base_update_queue import ( + BaseUpdateQueue, + service_logger_obj, +) +from litellm.types.services import ServiceTypes class DailySpendUpdateQueue(BaseUpdateQueue): @@ -93,3 +97,19 @@ class DailySpendUpdateQueue(BaseUpdateQueue): else: aggregated_daily_spend_update_transactions[_key] = payload return aggregated_daily_spend_update_transactions + + async def _emit_new_item_added_to_queue_event( + self, + queue_size: Optional[int] = None, + ): + asyncio.create_task( + service_logger_obj.async_service_success_hook( + service=ServiceTypes.IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE, + duration=0, + call_type="_emit_new_item_added_to_queue_event", + event_metadata={ + "gauge_labels": ServiceTypes.IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE, + "gauge_value": queue_size, + }, + ) + ) diff --git a/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py b/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py index 3f63afe62a..cb4a43a802 100644 --- a/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py +++ b/litellm/proxy/db/db_transaction_queue/pod_lock_manager.py @@ -3,9 +3,9 @@ import uuid from typing import TYPE_CHECKING, Any, Optional from litellm._logging import verbose_proxy_logger -from litellm._service_logger import ServiceLogging from litellm.caching.redis_cache import RedisCache from litellm.constants import DEFAULT_CRON_JOB_LOCK_TTL_SECONDS +from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj from litellm.types.services import ServiceTypes if TYPE_CHECKING: @@ -13,8 +13,6 @@ if TYPE_CHECKING: else: ProxyLogging = Any -service_logger_obj = ServiceLogging() # used for tracking current pod lock status - class PodLockManager: """ diff --git a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py index ea1356159a..88741fbb18 100644 --- a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py +++ b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py @@ -4,6 +4,7 @@ Handles buffering database `UPDATE` transactions in Redis before committing them This is to prevent deadlocks and improve reliability """ +import asyncio import json from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union @@ -16,11 +17,13 @@ from litellm.constants import ( ) from litellm.litellm_core_utils.safe_json_dumps import safe_dumps from litellm.proxy._types import DailyUserSpendTransaction, DBSpendUpdateTransactions +from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import ( DailySpendUpdateQueue, ) from litellm.proxy.db.db_transaction_queue.spend_update_queue import SpendUpdateQueue from litellm.secret_managers.main import str_to_bool +from litellm.types.services import ServiceTypes if TYPE_CHECKING: from litellm.proxy.utils import PrismaClient @@ -136,18 +139,27 @@ class RedisUpdateBuffer: return list_of_transactions = [safe_dumps(db_spend_update_transactions)] - await self.redis_cache.async_rpush( + current_redis_buffer_size = await self.redis_cache.async_rpush( key=REDIS_UPDATE_BUFFER_KEY, values=list_of_transactions, ) + await self._emit_new_item_added_to_redis_buffer_event( + queue_size=current_redis_buffer_size, + service=ServiceTypes.REDIS_SPEND_UPDATE_QUEUE, + ) list_of_daily_spend_update_transactions = [ safe_dumps(daily_spend_update_transactions) ] - await self.redis_cache.async_rpush( + + current_redis_buffer_size = await self.redis_cache.async_rpush( key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, values=list_of_daily_spend_update_transactions, ) + await self._emit_new_item_added_to_redis_buffer_event( + queue_size=current_redis_buffer_size, + service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, + ) @staticmethod def _number_of_transactions_to_store_in_redis( @@ -300,3 +312,20 @@ class RedisUpdateBuffer: ) return combined_transaction + + async def _emit_new_item_added_to_redis_buffer_event( + self, + service: ServiceTypes, + queue_size: int, + ): + asyncio.create_task( + service_logger_obj.async_service_success_hook( + service=service, + duration=0, + call_type="_emit_new_item_added_to_queue_event", + event_metadata={ + "gauge_labels": service, + "gauge_value": queue_size, + }, + ) + ) diff --git a/litellm/proxy/db/db_transaction_queue/spend_update_queue.py b/litellm/proxy/db/db_transaction_queue/spend_update_queue.py index ce181d1478..60e9379751 100644 --- a/litellm/proxy/db/db_transaction_queue/spend_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/spend_update_queue.py @@ -1,5 +1,5 @@ import asyncio -from typing import List +from typing import List, Optional from litellm._logging import verbose_proxy_logger from litellm.proxy._types import ( @@ -7,7 +7,11 @@ from litellm.proxy._types import ( Litellm_EntityType, SpendUpdateQueueItem, ) -from litellm.proxy.db.db_transaction_queue.base_update_queue import BaseUpdateQueue +from litellm.proxy.db.db_transaction_queue.base_update_queue import ( + BaseUpdateQueue, + service_logger_obj, +) +from litellm.types.services import ServiceTypes class SpendUpdateQueue(BaseUpdateQueue): @@ -111,3 +115,19 @@ class SpendUpdateQueue(BaseUpdateQueue): transactions_dict[entity_id] += response_cost or 0 return db_spend_update_transactions + + async def _emit_new_item_added_to_queue_event( + self, + queue_size: Optional[int] = None, + ): + asyncio.create_task( + service_logger_obj.async_service_success_hook( + service=ServiceTypes.IN_MEMORY_SPEND_UPDATE_QUEUE, + duration=0, + call_type="_emit_new_item_added_to_queue_event", + event_metadata={ + "gauge_labels": ServiceTypes.IN_MEMORY_SPEND_UPDATE_QUEUE, + "gauge_value": queue_size, + }, + ) + ) From c4e8b9607d9323c8910321d2c736f4d72aac1425 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 18:51:41 -0700 Subject: [PATCH 07/12] fix async_set_cache --- litellm/caching/redis_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/caching/redis_cache.py b/litellm/caching/redis_cache.py index 1d553c9c80..31e11abf97 100644 --- a/litellm/caching/redis_cache.py +++ b/litellm/caching/redis_cache.py @@ -303,7 +303,7 @@ class RedisCache(BaseCache): raise e key = self.check_and_fix_namespace(key=key) - ttl = self.get_ttl(**kwargs) or kwargs.get("ex", None) + ttl = self.get_ttl(**kwargs) nx = kwargs.get("nx", False) print_verbose(f"Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}") From bcf42fd82d8fb5806cced0287f65b05bf779117b Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 21:19:05 -0700 Subject: [PATCH 08/12] linting fix prometheus services --- litellm/integrations/prometheus_services.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index dddaa4d064..37f0d696fb 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -3,7 +3,7 @@ # On success + failure, log events to Prometheus for litellm / adjacent services (litellm, redis, postgres, llm api providers) -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union from litellm._logging import print_verbose, verbose_logger from litellm.types.integrations.prometheus import LATENCY_BUCKETS @@ -43,7 +43,9 @@ class PrometheusServicesLogger: verbose_logger.debug("in init prometheus services metrics") self.services = [item for item in ServiceTypes] - self.payload_to_prometheus_map = {} + self.payload_to_prometheus_map: Dict[ + str, List[Union[Histogram, Counter, Gauge, Collector]] + ] = {} for service in self.services: service_metrics: List[Union[Histogram, Counter, Gauge, Collector]] = [] @@ -78,7 +80,7 @@ class PrometheusServicesLogger: service_metrics.append(gauge) if service_metrics: - self.payload_to_prometheus_map[service] = service_metrics + self.payload_to_prometheus_map[service.value] = service_metrics self.prometheus_to_amount_map: dict = {} ### MOCK TESTING ### From e68603e176efe8075232c8e50004d17a84e694ab Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 21:31:19 -0700 Subject: [PATCH 09/12] test create and update gauge --- .../integrations/test_prometheus_services.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 tests/litellm/integrations/test_prometheus_services.py diff --git a/tests/litellm/integrations/test_prometheus_services.py b/tests/litellm/integrations/test_prometheus_services.py new file mode 100644 index 0000000000..b627d31fda --- /dev/null +++ b/tests/litellm/integrations/test_prometheus_services.py @@ -0,0 +1,48 @@ +import json +import os +import sys +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient + +from litellm.integrations.prometheus_services import ( + PrometheusServicesLogger, + ServiceMetrics, + ServiceTypes, +) + +sys.path.insert( + 0, os.path.abspath("../../..") +) # Adds the parent directory to the system path + + +def test_create_gauge_new(): + """Test creating a new gauge""" + pl = PrometheusServicesLogger() + + # Create new gauge + gauge = pl.create_gauge(service="test_service", type_of_request="size") + + assert gauge is not None + assert pl._get_metric("litellm_test_service_size") is gauge + + +def test_update_gauge(): + """Test updating a gauge's value""" + pl = PrometheusServicesLogger() + + # Create a gauge to test with + gauge = pl.create_gauge(service="test_service", type_of_request="size") + + # Mock the labels method to verify it's called correctly + with patch.object(gauge, "labels") as mock_labels: + mock_gauge = AsyncMock() + mock_labels.return_value = mock_gauge + + # Call update_gauge + pl.update_gauge(gauge=gauge, labels="test_label", amount=42.5) + + # Verify correct methods were called + mock_labels.assert_called_once_with("test_label") + mock_gauge.set.assert_called_once_with(42.5) From e3b788ea29dceac589ff08e24cab42d83206e72b Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Wed, 2 Apr 2025 21:58:35 -0700 Subject: [PATCH 10/12] fix test --- litellm/integrations/prometheus_services.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index 37f0d696fb..c060026e15 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -95,7 +95,7 @@ class PrometheusServicesLogger: def _get_service_metrics_initialize( self, service: ServiceTypes ) -> List[ServiceMetrics]: - DEFAULT_METRICS = [ServiceMetrics.COUNTER, ServiceMetrics.GAUGE] + DEFAULT_METRICS = [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] if service not in DEFAULT_SERVICE_CONFIGS: return DEFAULT_METRICS From bde88b3ba61df1f9f64e086e6b154099f08e4700 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 4 Apr 2025 16:34:43 -0700 Subject: [PATCH 11/12] fix type error --- litellm/integrations/prometheus_services.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index c060026e15..2bd38c2ae9 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -42,7 +42,7 @@ class PrometheusServicesLogger: verbose_logger.debug("in init prometheus services metrics") - self.services = [item for item in ServiceTypes] + self.services: List[ServiceTypes] = [item for item in ServiceTypes] self.payload_to_prometheus_map: Dict[ str, List[Union[Histogram, Counter, Gauge, Collector]] ] = {} @@ -55,27 +55,27 @@ class PrometheusServicesLogger: # Initialize only the configured metrics for each service if ServiceMetrics.HISTOGRAM in metrics_to_initialize: histogram = self.create_histogram( - service, type_of_request="latency" + service.value, type_of_request="latency" ) if histogram: service_metrics.append(histogram) if ServiceMetrics.COUNTER in metrics_to_initialize: counter_failed_request = self.create_counter( - service, + service.value, type_of_request="failed_requests", additional_labels=FAILED_REQUESTS_LABELS, ) if counter_failed_request: service_metrics.append(counter_failed_request) counter_total_requests = self.create_counter( - service, type_of_request="total_requests" + service.value, type_of_request="total_requests" ) if counter_total_requests: service_metrics.append(counter_total_requests) if ServiceMetrics.GAUGE in metrics_to_initialize: - gauge = self.create_gauge(service, type_of_request="size") + gauge = self.create_gauge(service.value, type_of_request="size") if gauge: service_metrics.append(gauge) From 901d6fe7b79c159afe95bbb33a7aa01850072030 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Fri, 4 Apr 2025 16:41:07 -0700 Subject: [PATCH 12/12] add operational metrics for pod lock manager v2 arch --- litellm/integrations/prometheus_services.py | 3 +-- litellm/proxy/proxy_config.yaml | 9 ++------- litellm/types/services.py | 11 +++++++++++ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/litellm/integrations/prometheus_services.py b/litellm/integrations/prometheus_services.py index 2bd38c2ae9..a5f2f0b5c7 100644 --- a/litellm/integrations/prometheus_services.py +++ b/litellm/integrations/prometheus_services.py @@ -42,12 +42,11 @@ class PrometheusServicesLogger: verbose_logger.debug("in init prometheus services metrics") - self.services: List[ServiceTypes] = [item for item in ServiceTypes] self.payload_to_prometheus_map: Dict[ str, List[Union[Histogram, Counter, Gauge, Collector]] ] = {} - for service in self.services: + for service in ServiceTypes: service_metrics: List[Union[Histogram, Counter, Gauge, Collector]] = [] metrics_to_initialize = self._get_service_metrics_initialize(service) diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 52948c927e..56eb2ca39f 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -5,11 +5,6 @@ model_list: api_key: fake-key api_base: https://exampleopenaiendpoint-production.up.railway.app/ -general_settings: - use_redis_transaction_buffer: true - litellm_settings: - cache: True - cache_params: - type: redis - supported_call_types: [] + callbacks: ["prometheus"] + service_callback: ["prometheus_system"] \ No newline at end of file diff --git a/litellm/types/services.py b/litellm/types/services.py index e7b3c91ed3..865827f0f8 100644 --- a/litellm/types/services.py +++ b/litellm/types/services.py @@ -78,7 +78,18 @@ DEFAULT_SERVICE_CONFIGS = { ServiceTypes.PROXY_PRE_CALL.value: { "metrics": [ServiceMetrics.COUNTER, ServiceMetrics.HISTOGRAM] }, + # Operational metrics for DB Transaction Queues ServiceTypes.POD_LOCK_MANAGER.value: {"metrics": [ServiceMetrics.GAUGE]}, + ServiceTypes.IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE.value: { + "metrics": [ServiceMetrics.GAUGE] + }, + ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE.value: { + "metrics": [ServiceMetrics.GAUGE] + }, + ServiceTypes.IN_MEMORY_SPEND_UPDATE_QUEUE.value: { + "metrics": [ServiceMetrics.GAUGE] + }, + ServiceTypes.REDIS_SPEND_UPDATE_QUEUE.value: {"metrics": [ServiceMetrics.GAUGE]}, }