mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 11:43:54 +00:00
(feat) - provider budget improvements - ensure provider budgets work with multiple proxy instances + improve latency to ~90ms (#6886)
* use 1 file for duration_in_seconds * add to readme.md * re use duration_in_seconds * fix importing _extract_from_regex, get_last_day_of_month * fix import * update provider budget routing * fix - remove dup test * add support for using in multi instance environments * test_in_memory_redis_sync_e2e * test_in_memory_redis_sync_e2e * fix test_in_memory_redis_sync_e2e * fix code quality check * fix test provider budgets * working provider budget tests * add fixture for provider budget routing * fix router testing for provider budgets * add comments on provider budget routing * use RedisPipelineIncrementOperation * add redis async_increment_pipeline * use redis async_increment_pipeline * use lower value for testing * use redis async_increment_pipeline * use consistent key name for increment op * add handling for budget windows * fix typing async_increment_pipeline * fix set attr * add clear doc strings * unit testing for provider budgets * test_redis_increment_pipeline
This commit is contained in:
parent
72afed5b7e
commit
e47ebefced
7 changed files with 638 additions and 52 deletions
|
@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Tuple
|
|||
import litellm
|
||||
from litellm._logging import print_verbose, verbose_logger
|
||||
from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
|
||||
from litellm.types.caching import RedisPipelineIncrementOperation
|
||||
from litellm.types.services import ServiceLoggerPayload, ServiceTypes
|
||||
from litellm.types.utils import all_litellm_params
|
||||
|
||||
|
@ -890,3 +891,92 @@ class RedisCache(BaseCache):
|
|||
|
||||
def delete_cache(self, key):
|
||||
self.redis_client.delete(key)
|
||||
|
||||
async def _pipeline_increment_helper(
|
||||
self,
|
||||
pipe: pipeline,
|
||||
increment_list: List[RedisPipelineIncrementOperation],
|
||||
) -> Optional[List[float]]:
|
||||
"""Helper function for pipeline increment operations"""
|
||||
# Iterate through each increment operation and add commands to pipeline
|
||||
for increment_op in increment_list:
|
||||
cache_key = self.check_and_fix_namespace(key=increment_op["key"])
|
||||
print_verbose(
|
||||
f"Increment ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {increment_op['increment_value']}\nttl={increment_op['ttl']}"
|
||||
)
|
||||
pipe.incrbyfloat(cache_key, increment_op["increment_value"])
|
||||
if increment_op["ttl"] is not None:
|
||||
_td = timedelta(seconds=increment_op["ttl"])
|
||||
pipe.expire(cache_key, _td)
|
||||
# Execute the pipeline and return results
|
||||
results = await pipe.execute()
|
||||
print_verbose(f"Increment ASYNC Redis Cache PIPELINE: results: {results}")
|
||||
return results
|
||||
|
||||
async def async_increment_pipeline(
|
||||
self, increment_list: List[RedisPipelineIncrementOperation], **kwargs
|
||||
) -> Optional[List[float]]:
|
||||
"""
|
||||
Use Redis Pipelines for bulk increment operations
|
||||
Args:
|
||||
increment_list: List of RedisPipelineIncrementOperation dicts containing:
|
||||
- key: str
|
||||
- increment_value: float
|
||||
- ttl_seconds: int
|
||||
"""
|
||||
# don't waste a network request if there's nothing to increment
|
||||
if len(increment_list) == 0:
|
||||
return None
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
_redis_client: Redis = self.init_async_client() # type: ignore
|
||||
start_time = time.time()
|
||||
|
||||
print_verbose(
|
||||
f"Increment Async Redis Cache Pipeline: increment list: {increment_list}"
|
||||
)
|
||||
|
||||
try:
|
||||
async with _redis_client as redis_client:
|
||||
async with redis_client.pipeline(transaction=True) as pipe:
|
||||
results = await self._pipeline_increment_helper(
|
||||
pipe, increment_list
|
||||
)
|
||||
|
||||
print_verbose(f"pipeline increment results: {results}")
|
||||
|
||||
## LOGGING ##
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_success_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
call_type="async_increment_pipeline",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
|
||||
)
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
## LOGGING ##
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_failure_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
error=e,
|
||||
call_type="async_increment_pipeline",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
|
||||
)
|
||||
)
|
||||
verbose_logger.error(
|
||||
"LiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s",
|
||||
str(e),
|
||||
)
|
||||
raise e
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue