add handling for budget windows

This commit is contained in:
Ishaan Jaff 2024-11-24 15:47:56 -08:00
parent be25706736
commit 8aa8f2e4ab
2 changed files with 89 additions and 27 deletions

View file

@ -11,7 +11,7 @@ model_list:
router_settings: router_settings:
provider_budget_config: provider_budget_config:
openai: openai:
budget_limit: 0.2 # float of $ value budget for time period budget_limit: 0.3 # float of $ value budget for time period
time_period: 1d # can be 1d, 2d, 30d time_period: 1d # can be 1d, 2d, 30d
anthropic: anthropic:
budget_limit: 5 budget_limit: 5
@ -21,6 +21,4 @@ router_settings:
redis_password: os.environ/REDIS_PASSWORD redis_password: os.environ/REDIS_PASSWORD
litellm_settings: litellm_settings:
callbacks: ["prometheus"] callbacks: ["prometheus"]

View file

@ -19,6 +19,7 @@ anthropic:
""" """
import asyncio import asyncio
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict, Union from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict, Union
import litellm import litellm
@ -46,7 +47,7 @@ if TYPE_CHECKING:
else: else:
Span = Any Span = Any
DEFAULT_REDIS_SYNC_INTERVAL = 60 DEFAULT_REDIS_SYNC_INTERVAL = 1
class ProviderBudgetLimiting(CustomLogger): class ProviderBudgetLimiting(CustomLogger):
@ -179,19 +180,55 @@ class ProviderBudgetLimiting(CustomLogger):
return potential_deployments return potential_deployments
async def _get_or_set_budget_start_time(
self, start_time_key: str, current_time: float, ttl_seconds: int
) -> float:
"""
Get existing budget start time or set a new one
"""
budget_start = await self.router_cache.async_get_cache(start_time_key)
if budget_start is None:
await self.router_cache.async_set_cache(
key=start_time_key, value=current_time, ttl=ttl_seconds
)
return current_time
return float(budget_start)
async def _handle_new_budget_window(
self,
spend_key: str,
start_time_key: str,
current_time: float,
response_cost: float,
ttl_seconds: int,
) -> float:
"""Handle start of new budget window by resetting spend and start time"""
await self.router_cache.async_set_cache(
key=spend_key, value=response_cost, ttl=ttl_seconds
)
await self.router_cache.async_set_cache(
key=start_time_key, value=current_time, ttl=ttl_seconds
)
return current_time
async def _increment_spend_in_current_window(
self, spend_key: str, response_cost: float, ttl: int
):
"""Increment spend within existing budget window"""
await self.router_cache.in_memory_cache.async_increment(
key=spend_key,
value=response_cost,
ttl=ttl,
)
increment_op = RedisPipelineIncrementOperation(
key=spend_key,
increment_value=response_cost,
ttl=ttl,
)
self.redis_increment_operation_queue.append(increment_op)
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
""" """Original method now uses helper functions"""
Increment provider spend in DualCache (InMemory + Redis)
Handles saving current provider spend to Redis.
Spend is stored as:
provider_spend:{provider}:{time_period}
ex. provider_spend:openai:1d
ex. provider_spend:anthropic:7d
The time period is tracked for time_periods set in the provider budget config.
"""
verbose_router_logger.debug("in ProviderBudgetLimiting.async_log_success_event") verbose_router_logger.debug("in ProviderBudgetLimiting.async_log_success_event")
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None "standard_logging_object", None
@ -214,20 +251,47 @@ class ProviderBudgetLimiting(CustomLogger):
) )
spend_key = f"provider_spend:{custom_llm_provider}:{budget_config.time_period}" spend_key = f"provider_spend:{custom_llm_provider}:{budget_config.time_period}"
ttl_seconds = duration_in_seconds(duration=budget_config.time_period) start_time_key = f"provider_budget_start_time:{custom_llm_provider}"
# Create RedisPipelineIncrementOperation object current_time = datetime.now(timezone.utc).timestamp()
increment_op = RedisPipelineIncrementOperation( ttl_seconds = duration_in_seconds(budget_config.time_period)
key=spend_key, increment_value=response_cost, ttl_seconds=ttl_seconds
budget_start = await self._get_or_set_budget_start_time(
start_time_key=start_time_key,
current_time=current_time,
ttl_seconds=ttl_seconds,
) )
await self.router_cache.in_memory_cache.async_increment( if budget_start is None:
key=spend_key, # First spend for this provider
value=response_cost, budget_start = await self._handle_new_budget_window(
) spend_key=spend_key,
self.redis_increment_operation_queue.append(increment_op) start_time_key=start_time_key,
current_time=current_time,
response_cost=response_cost,
ttl_seconds=ttl_seconds,
)
elif (current_time - budget_start) > ttl_seconds:
# Budget window expired - reset everything
verbose_router_logger.debug("Budget window expired - resetting everything")
budget_start = await self._handle_new_budget_window(
spend_key=spend_key,
start_time_key=start_time_key,
current_time=current_time,
response_cost=response_cost,
ttl_seconds=ttl_seconds,
)
else:
# Within existing window - increment spend
remaining_time = ttl_seconds - (current_time - budget_start)
ttl_for_increment = int(remaining_time)
await self._increment_spend_in_current_window(
spend_key=spend_key, response_cost=response_cost, ttl=ttl_for_increment
)
verbose_router_logger.debug( verbose_router_logger.debug(
f"Incremented spend for {spend_key} by {response_cost}, ttl: {ttl_seconds}" f"Incremented spend for {spend_key} by {response_cost}"
) )
async def periodic_sync_in_memory_spend_with_redis(self): async def periodic_sync_in_memory_spend_with_redis(self):