add clear doc strings

This commit is contained in:
Ishaan Jaff 2024-11-24 16:17:07 -08:00
parent 2fb9b245a1
commit d27b527477

View file

@ -184,7 +184,10 @@ class ProviderBudgetLimiting(CustomLogger):
self, start_time_key: str, current_time: float, ttl_seconds: int self, start_time_key: str, current_time: float, ttl_seconds: int
) -> float: ) -> float:
""" """
Get existing budget start time or set a new one Checks if the key = `provider_budget_start_time:{provider}` exists in cache.
If it does, return the value.
If it does not, set the key to `current_time` and return the value.
""" """
budget_start = await self.router_cache.async_get_cache(start_time_key) budget_start = await self.router_cache.async_get_cache(start_time_key)
if budget_start is None: if budget_start is None:
@ -202,7 +205,18 @@ class ProviderBudgetLimiting(CustomLogger):
response_cost: float, response_cost: float,
ttl_seconds: int, ttl_seconds: int,
) -> float: ) -> float:
"""Handle start of new budget window by resetting spend and start time""" """
Handle start of new budget window by resetting spend and start time
Enters this when:
- The budget does not exist in cache, so we need to set it
- The budget window has expired, so we need to reset everything
Does 2 things:
- stores key: `provider_spend:{provider}:1d`, value: response_cost
- stores key: `provider_budget_start_time:{provider}`, value: current_time.
This stores the start time of the new budget window
"""
await self.router_cache.async_set_cache( await self.router_cache.async_set_cache(
key=spend_key, value=response_cost, ttl=ttl_seconds key=spend_key, value=response_cost, ttl=ttl_seconds
) )
@ -214,7 +228,14 @@ class ProviderBudgetLimiting(CustomLogger):
async def _increment_spend_in_current_window( async def _increment_spend_in_current_window(
self, spend_key: str, response_cost: float, ttl: int self, spend_key: str, response_cost: float, ttl: int
): ):
"""Increment spend within existing budget window""" """
Increment spend within existing budget window
Runs once the budget start time exists in Redis Cache (on the 2nd and subsequent requests to the same provider)
- Increments the spend in memory cache (so spend instantly updated in memory)
- Queues the increment operation to Redis Pipeline (using batched pipeline to optimize performance. Using Redis for multi instance environment of LiteLLM)
"""
await self.router_cache.in_memory_cache.async_increment( await self.router_cache.in_memory_cache.async_increment(
key=spend_key, key=spend_key,
value=response_cost, value=response_cost,
@ -305,25 +326,25 @@ class ProviderBudgetLimiting(CustomLogger):
await self._sync_in_memory_spend_with_redis() await self._sync_in_memory_spend_with_redis()
await asyncio.sleep( await asyncio.sleep(
DEFAULT_REDIS_SYNC_INTERVAL DEFAULT_REDIS_SYNC_INTERVAL
) # Wait for 5 seconds before next sync ) # Wait for DEFAULT_REDIS_SYNC_INTERVAL seconds before next sync
except Exception as e: except Exception as e:
verbose_router_logger.error(f"Error in periodic sync task: {str(e)}") verbose_router_logger.error(f"Error in periodic sync task: {str(e)}")
await asyncio.sleep( await asyncio.sleep(
DEFAULT_REDIS_SYNC_INTERVAL DEFAULT_REDIS_SYNC_INTERVAL
) # Still wait 5 seconds on error before retrying ) # Still wait DEFAULT_REDIS_SYNC_INTERVAL seconds on error before retrying
async def _push_in_memory_increments_to_redis(self): async def _push_in_memory_increments_to_redis(self):
""" """
This is a latency / speed optimization.
How this works: How this works:
- Collect all provider spend increments in `router_cache.in_memory_cache`, done in async_log_success_event - async_log_success_event collects all provider spend increments in `redis_increment_operation_queue`
- Push all increments to Redis in this function - This function pushes all increments to Redis in a batched pipeline to optimize performance
- Reset the in-memory `last_synced_values`
Only runs if Redis is initialized
""" """
try: try:
if not self.router_cache.redis_cache: if not self.router_cache.redis_cache:
return # Redis is not initialized return # Redis is not initialized
verbose_router_logger.debug( verbose_router_logger.debug(
"Pushing Redis Increment Pipeline for queue: %s", "Pushing Redis Increment Pipeline for queue: %s",
self.redis_increment_operation_queue, self.redis_increment_operation_queue,
@ -347,11 +368,12 @@ class ProviderBudgetLimiting(CustomLogger):
Ensures in-memory cache is updated with latest Redis values for all provider spends. Ensures in-memory cache is updated with latest Redis values for all provider spends.
Why Do we need this? Why Do we need this?
- Redis is our source of truth for provider spend - Optimization to hit sub 100ms latency. Performance was impacted when redis was used for read/write per request
- Optimization to hit ~100ms latency. Performance was impacted when redis was used for read/write per request - Use provider budgets in multi-instance environment, we use Redis to sync spend across all instances
What this does:
In a multi-instance evironment, each instance needs to periodically get the provider spend from Redis to ensure it is consistent across all instances. 1. Push all provider spend increments to Redis
2. Fetch all current provider spend from Redis to update in-memory cache
""" """
try: try:
@ -359,11 +381,10 @@ class ProviderBudgetLimiting(CustomLogger):
if self.router_cache.redis_cache is None: if self.router_cache.redis_cache is None:
return return
# Push all provider spend increments to Redis # 1. Push all provider spend increments to Redis
await self._push_in_memory_increments_to_redis() await self._push_in_memory_increments_to_redis()
# Handle Reading all current provider spend from Redis in Memory # 2. Fetch all current provider spend from Redis to update in-memory cache
# Get all providers and their budget configs
cache_keys = [] cache_keys = []
for provider, config in self.provider_budget_config.items(): for provider, config in self.provider_budget_config.items():
if config is None: if config is None: