From d27b5274778cb52c5110374b746b3c3ad37e2b31 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sun, 24 Nov 2024 16:17:07 -0800 Subject: [PATCH] add clear doc strings --- litellm/router_strategy/provider_budgets.py | 55 ++++++++++++++------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/litellm/router_strategy/provider_budgets.py b/litellm/router_strategy/provider_budgets.py index 730447e7e..f4dc1ba94 100644 --- a/litellm/router_strategy/provider_budgets.py +++ b/litellm/router_strategy/provider_budgets.py @@ -184,7 +184,10 @@ class ProviderBudgetLimiting(CustomLogger): self, start_time_key: str, current_time: float, ttl_seconds: int ) -> float: """ - Get existing budget start time or set a new one + Checks if the key = `provider_budget_start_time:{provider}` exists in cache. + + If it does, return the value. + If it does not, set the key to `current_time` and return the value. """ budget_start = await self.router_cache.async_get_cache(start_time_key) if budget_start is None: @@ -202,7 +205,18 @@ class ProviderBudgetLimiting(CustomLogger): response_cost: float, ttl_seconds: int, ) -> float: - """Handle start of new budget window by resetting spend and start time""" + """ + Handle start of new budget window by resetting spend and start time + + Enters this when: + - The budget does not exist in cache, so we need to set it + - The budget window has expired, so we need to reset everything + + Does 2 things: + - stores key: `provider_spend:{provider}:1d`, value: response_cost + - stores key: `provider_budget_start_time:{provider}`, value: current_time. + This stores the start time of the new budget window + """ await self.router_cache.async_set_cache( key=spend_key, value=response_cost, ttl=ttl_seconds ) @@ -214,7 +228,14 @@ class ProviderBudgetLimiting(CustomLogger): async def _increment_spend_in_current_window( self, spend_key: str, response_cost: float, ttl: int ): - """Increment spend within existing budget window""" + """ + Increment spend within existing budget window + + Runs once the budget start time exists in Redis Cache (on the 2nd and subsequent requests to the same provider) + + - Increments the spend in memory cache (so spend instantly updated in memory) + - Queues the increment operation to Redis Pipeline (using batched pipeline to optimize performance. Using Redis for multi instance environment of LiteLLM) + """ await self.router_cache.in_memory_cache.async_increment( key=spend_key, value=response_cost, @@ -305,25 +326,25 @@ class ProviderBudgetLimiting(CustomLogger): await self._sync_in_memory_spend_with_redis() await asyncio.sleep( DEFAULT_REDIS_SYNC_INTERVAL - ) # Wait for 5 seconds before next sync + ) # Wait for DEFAULT_REDIS_SYNC_INTERVAL seconds before next sync except Exception as e: verbose_router_logger.error(f"Error in periodic sync task: {str(e)}") await asyncio.sleep( DEFAULT_REDIS_SYNC_INTERVAL - ) # Still wait 5 seconds on error before retrying + ) # Still wait DEFAULT_REDIS_SYNC_INTERVAL seconds on error before retrying async def _push_in_memory_increments_to_redis(self): """ - This is a latency / speed optimization. - How this works: - - Collect all provider spend increments in `router_cache.in_memory_cache`, done in async_log_success_event - - Push all increments to Redis in this function - - Reset the in-memory `last_synced_values` + - async_log_success_event collects all provider spend increments in `redis_increment_operation_queue` + - This function pushes all increments to Redis in a batched pipeline to optimize performance + + Only runs if Redis is initialized """ try: if not self.router_cache.redis_cache: return # Redis is not initialized + verbose_router_logger.debug( "Pushing Redis Increment Pipeline for queue: %s", self.redis_increment_operation_queue, @@ -347,11 +368,12 @@ class ProviderBudgetLimiting(CustomLogger): Ensures in-memory cache is updated with latest Redis values for all provider spends. Why Do we need this? - - Redis is our source of truth for provider spend - - Optimization to hit ~100ms latency. Performance was impacted when redis was used for read/write per request + - Optimization to hit sub 100ms latency. Performance was impacted when redis was used for read/write per request + - Use provider budgets in multi-instance environment, we use Redis to sync spend across all instances - - In a multi-instance evironment, each instance needs to periodically get the provider spend from Redis to ensure it is consistent across all instances. + What this does: + 1. Push all provider spend increments to Redis + 2. Fetch all current provider spend from Redis to update in-memory cache """ try: @@ -359,11 +381,10 @@ class ProviderBudgetLimiting(CustomLogger): if self.router_cache.redis_cache is None: return - # Push all provider spend increments to Redis + # 1. Push all provider spend increments to Redis await self._push_in_memory_increments_to_redis() - # Handle Reading all current provider spend from Redis in Memory - # Get all providers and their budget configs + # 2. Fetch all current provider spend from Redis to update in-memory cache cache_keys = [] for provider, config in self.provider_budget_config.items(): if config is None: