diff --git a/litellm/caching/redis_cache.py b/litellm/caching/redis_cache.py index e15a3f83d..58ef51557 100644 --- a/litellm/caching/redis_cache.py +++ b/litellm/caching/redis_cache.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Tuple import litellm from litellm._logging import print_verbose, verbose_logger from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs +from litellm.types.caching import RedisPipelineIncrementOperation from litellm.types.services import ServiceLoggerPayload, ServiceTypes from litellm.types.utils import all_litellm_params @@ -890,3 +891,92 @@ class RedisCache(BaseCache): def delete_cache(self, key): self.redis_client.delete(key) + + async def _pipeline_increment_helper( + self, + pipe: pipeline, + increment_list: List[RedisPipelineIncrementOperation], + ) -> Optional[List[float]]: + """Helper function for pipeline increment operations""" + # Iterate through each increment operation and add commands to pipeline + for increment_op in increment_list: + cache_key = self.check_and_fix_namespace(key=increment_op["key"]) + print_verbose( + f"Increment ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {increment_op['increment_value']}\nttl={increment_op['ttl_seconds']}" + ) + pipe.incrbyfloat(cache_key, increment_op["increment_value"]) + if increment_op["ttl_seconds"] is not None: + _td = timedelta(seconds=increment_op["ttl_seconds"]) + pipe.expire(cache_key, _td) + # Execute the pipeline and return results + results = await pipe.execute() + print_verbose(f"Increment ASYNC Redis Cache PIPELINE: results: {results}") + return results + + async def async_increment_pipeline( + self, increment_list: List[RedisPipelineIncrementOperation], **kwargs + ) -> Optional[List[float]]: + """ + Use Redis Pipelines for bulk increment operations + Args: + increment_list: List of RedisPipelineIncrementOperation dicts containing: + - key: str + - increment_value: float + - ttl_seconds: int + """ + # don't waste a network request if there's nothing to increment + if len(increment_list) == 0: + return + + from redis.asyncio import Redis + + _redis_client: Redis = self.init_async_client() # type: ignore + start_time = time.time() + + print_verbose( + f"Increment Async Redis Cache Pipeline: increment list: {increment_list}" + ) + + try: + async with _redis_client as redis_client: + async with redis_client.pipeline(transaction=True) as pipe: + results = await self._pipeline_increment_helper( + pipe, increment_list + ) + + print_verbose(f"pipeline increment results: {results}") + + ## LOGGING ## + end_time = time.time() + _duration = end_time - start_time + asyncio.create_task( + self.service_logger_obj.async_service_success_hook( + service=ServiceTypes.REDIS, + duration=_duration, + call_type="async_increment_pipeline", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), + ) + ) + return results + except Exception as e: + ## LOGGING ## + end_time = time.time() + _duration = end_time - start_time + asyncio.create_task( + self.service_logger_obj.async_service_failure_hook( + service=ServiceTypes.REDIS, + duration=_duration, + error=e, + call_type="async_increment_pipeline", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), + ) + ) + verbose_logger.error( + "LiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s", + str(e), + ) + raise e