From 8aa8f2e4ab70471ccff0e998ccf1862daaef6563 Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Sun, 24 Nov 2024 15:47:56 -0800 Subject: [PATCH] add handling for budget windows --- litellm/proxy/proxy_config.yaml | 6 +- litellm/router_strategy/provider_budgets.py | 110 ++++++++++++++++---- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index c40b56eeb..13fb1bcbe 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -11,7 +11,7 @@ model_list: router_settings: provider_budget_config: openai: - budget_limit: 0.2 # float of $ value budget for time period + budget_limit: 0.3 # float of $ value budget for time period time_period: 1d # can be 1d, 2d, 30d anthropic: budget_limit: 5 @@ -21,6 +21,4 @@ router_settings: redis_password: os.environ/REDIS_PASSWORD litellm_settings: - callbacks: ["prometheus"] - - + callbacks: ["prometheus"] \ No newline at end of file diff --git a/litellm/router_strategy/provider_budgets.py b/litellm/router_strategy/provider_budgets.py index 2b34f01eb..730447e7e 100644 --- a/litellm/router_strategy/provider_budgets.py +++ b/litellm/router_strategy/provider_budgets.py @@ -19,6 +19,7 @@ anthropic: """ import asyncio +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict, Union import litellm @@ -46,7 +47,7 @@ if TYPE_CHECKING: else: Span = Any -DEFAULT_REDIS_SYNC_INTERVAL = 60 +DEFAULT_REDIS_SYNC_INTERVAL = 1 class ProviderBudgetLimiting(CustomLogger): @@ -179,19 +180,55 @@ class ProviderBudgetLimiting(CustomLogger): return potential_deployments + async def _get_or_set_budget_start_time( + self, start_time_key: str, current_time: float, ttl_seconds: int + ) -> float: + """ + Get existing budget start time or set a new one + """ + budget_start = await self.router_cache.async_get_cache(start_time_key) + if budget_start is None: + await self.router_cache.async_set_cache( + key=start_time_key, value=current_time, ttl=ttl_seconds + ) + return current_time + return float(budget_start) + + async def _handle_new_budget_window( + self, + spend_key: str, + start_time_key: str, + current_time: float, + response_cost: float, + ttl_seconds: int, + ) -> float: + """Handle start of new budget window by resetting spend and start time""" + await self.router_cache.async_set_cache( + key=spend_key, value=response_cost, ttl=ttl_seconds + ) + await self.router_cache.async_set_cache( + key=start_time_key, value=current_time, ttl=ttl_seconds + ) + return current_time + + async def _increment_spend_in_current_window( + self, spend_key: str, response_cost: float, ttl: int + ): + """Increment spend within existing budget window""" + await self.router_cache.in_memory_cache.async_increment( + key=spend_key, + value=response_cost, + ttl=ttl, + ) + increment_op = RedisPipelineIncrementOperation( + key=spend_key, + increment_value=response_cost, + ttl=ttl, + ) + self.redis_increment_operation_queue.append(increment_op) + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): - """ - Increment provider spend in DualCache (InMemory + Redis) - - Handles saving current provider spend to Redis. - - Spend is stored as: - provider_spend:{provider}:{time_period} - ex. provider_spend:openai:1d - ex. provider_spend:anthropic:7d - - The time period is tracked for time_periods set in the provider budget config. - """ + """Original method now uses helper functions""" verbose_router_logger.debug("in ProviderBudgetLimiting.async_log_success_event") standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( "standard_logging_object", None @@ -214,20 +251,47 @@ class ProviderBudgetLimiting(CustomLogger): ) spend_key = f"provider_spend:{custom_llm_provider}:{budget_config.time_period}" - ttl_seconds = duration_in_seconds(duration=budget_config.time_period) + start_time_key = f"provider_budget_start_time:{custom_llm_provider}" - # Create RedisPipelineIncrementOperation object - increment_op = RedisPipelineIncrementOperation( - key=spend_key, increment_value=response_cost, ttl_seconds=ttl_seconds + current_time = datetime.now(timezone.utc).timestamp() + ttl_seconds = duration_in_seconds(budget_config.time_period) + + budget_start = await self._get_or_set_budget_start_time( + start_time_key=start_time_key, + current_time=current_time, + ttl_seconds=ttl_seconds, ) - await self.router_cache.in_memory_cache.async_increment( - key=spend_key, - value=response_cost, - ) - self.redis_increment_operation_queue.append(increment_op) + if budget_start is None: + # First spend for this provider + budget_start = await self._handle_new_budget_window( + spend_key=spend_key, + start_time_key=start_time_key, + current_time=current_time, + response_cost=response_cost, + ttl_seconds=ttl_seconds, + ) + elif (current_time - budget_start) > ttl_seconds: + # Budget window expired - reset everything + verbose_router_logger.debug("Budget window expired - resetting everything") + budget_start = await self._handle_new_budget_window( + spend_key=spend_key, + start_time_key=start_time_key, + current_time=current_time, + response_cost=response_cost, + ttl_seconds=ttl_seconds, + ) + else: + # Within existing window - increment spend + remaining_time = ttl_seconds - (current_time - budget_start) + ttl_for_increment = int(remaining_time) + + await self._increment_spend_in_current_window( + spend_key=spend_key, response_cost=response_cost, ttl=ttl_for_increment + ) + verbose_router_logger.debug( - f"Incremented spend for {spend_key} by {response_cost}, ttl: {ttl_seconds}" + f"Incremented spend for {spend_key} by {response_cost}" ) async def periodic_sync_in_memory_spend_with_redis(self):