diff --git a/litellm/caching/redis_cache.py b/litellm/caching/redis_cache.py index 0571ac9f15..620ba5d73a 100644 --- a/litellm/caching/redis_cache.py +++ b/litellm/caching/redis_cache.py @@ -1045,3 +1045,138 @@ class RedisCache(BaseCache): except Exception as e: verbose_logger.debug(f"Redis TTL Error: {e}") return None + + async def _generic_redis_command( + self, + command: str, + key: str, + *args, + parent_otel_span: Optional[Span] = None, + **kwargs, + ) -> Any: + """ + Generic method to execute any Redis command + + Args: + command: The Redis command to execute (e.g. 'rpush', 'lpop') + key: The Redis key to operate on + args: Additional arguments to pass to the Redis command + parent_otel_span: Optional parent OpenTelemetry span + kwargs: Additional keyword arguments + + Returns: + Any: The result of the Redis command + """ + _redis_client = self.init_async_client() + key = self.check_and_fix_namespace(key=key) + start_time = time.time() + + try: + # Get the method from the Redis client + redis_method = getattr(_redis_client, command) + if not redis_method: + raise AttributeError(f"Redis client has no method named '{command}'") + + # Execute the command + result = await redis_method(key, *args) + + # Log success + end_time = time.time() + _duration = end_time - start_time + asyncio.create_task( + self.service_logger_obj.async_service_success_hook( + service=ServiceTypes.REDIS, + duration=_duration, + call_type=f"async_{command}", + start_time=start_time, + end_time=end_time, + parent_otel_span=parent_otel_span, + event_metadata={"key": key}, + ) + ) + return result + except Exception as e: + # Log failure + end_time = time.time() + _duration = end_time - start_time + asyncio.create_task( + self.service_logger_obj.async_service_failure_hook( + service=ServiceTypes.REDIS, + duration=_duration, + error=e, + call_type=f"async_{command}", + start_time=start_time, + end_time=end_time, + parent_otel_span=parent_otel_span, + event_metadata={"key": key}, + ) + ) + verbose_logger.error( + f"LiteLLM Redis Caching: async {command}() - Got exception from REDIS: {str(e)}" + ) + raise e + + async def async_rpush( + self, + key: str, + values: List[Any], + parent_otel_span: Optional[Span] = None, + **kwargs, + ) -> int: + """ + Append one or multiple values to a list stored at key + + Args: + key: The Redis key of the list + values: One or more values to append to the list + parent_otel_span: Optional parent OpenTelemetry span + + Returns: + int: The length of the list after the push operation + """ + print_verbose(f"RPUSH to Redis list: key: {key}, values: {values}") + return await self._generic_redis_command( + "rpush", key, *values, parent_otel_span=parent_otel_span, **kwargs + ) + + async def async_lpop( + self, + key: str, + count: Optional[int] = None, + parent_otel_span: Optional[Span] = None, + **kwargs, + ) -> Union[Any, List[Any]]: + """ + Remove and return the first element(s) of a list stored at key + + Args: + key: The Redis key of the list + count: Number of elements to pop (if None, pops a single element) + parent_otel_span: Optional parent OpenTelemetry span + + Returns: + Union[Any, List[Any]]: The popped value(s) or None if list is empty + """ + print_verbose(f"LPOP from Redis list: key: {key}, count: {count}") + args = [count] if count is not None else [] + + # Fix: Pass command as a positional argument, not as a keyword argument + result = await self._generic_redis_command( + "lpop", key, *args, parent_otel_span=parent_otel_span, **kwargs + ) + + # Handle result parsing if needed + if isinstance(result, bytes): + try: + return result.decode("utf-8") + except Exception: + return result + elif isinstance(result, list) and all( + isinstance(item, bytes) for item in result + ): + try: + return [item.decode("utf-8") for item in result] + except Exception: + return result + + return result