redis cache add async push and pop methods

This commit is contained in:
Ishaan Jaff 2025-03-28 19:49:38 -07:00
parent 7a63cbe8d0
commit 19f35ada82

View file

@ -1046,76 +1046,6 @@ class RedisCache(BaseCache):
verbose_logger.debug(f"Redis TTL Error: {e}") verbose_logger.debug(f"Redis TTL Error: {e}")
return None return None
async def _generic_redis_command(
self,
command: str,
key: str,
*args,
parent_otel_span: Optional[Span] = None,
**kwargs,
) -> Any:
"""
Generic method to execute any Redis command
Args:
command: The Redis command to execute (e.g. 'rpush', 'lpop')
key: The Redis key to operate on
args: Additional arguments to pass to the Redis command
parent_otel_span: Optional parent OpenTelemetry span
kwargs: Additional keyword arguments
Returns:
Any: The result of the Redis command
"""
_redis_client = self.init_async_client()
key = self.check_and_fix_namespace(key=key)
start_time = time.time()
try:
# Get the method from the Redis client
redis_method = getattr(_redis_client, command)
if not redis_method:
raise AttributeError(f"Redis client has no method named '{command}'")
# Execute the command
result = await redis_method(key, *args)
# Log success
end_time = time.time()
_duration = end_time - start_time
asyncio.create_task(
self.service_logger_obj.async_service_success_hook(
service=ServiceTypes.REDIS,
duration=_duration,
call_type=f"async_{command}",
start_time=start_time,
end_time=end_time,
parent_otel_span=parent_otel_span,
event_metadata={"key": key},
)
)
return result
except Exception as e:
# Log failure
end_time = time.time()
_duration = end_time - start_time
asyncio.create_task(
self.service_logger_obj.async_service_failure_hook(
service=ServiceTypes.REDIS,
duration=_duration,
error=e,
call_type=f"async_{command}",
start_time=start_time,
end_time=end_time,
parent_otel_span=parent_otel_span,
event_metadata={"key": key},
)
)
verbose_logger.error(
f"LiteLLM Redis Caching: async {command}() - Got exception from REDIS: {str(e)}"
)
raise e
async def async_rpush( async def async_rpush(
self, self,
key: str, key: str,
@ -1134,10 +1064,38 @@ class RedisCache(BaseCache):
Returns: Returns:
int: The length of the list after the push operation int: The length of the list after the push operation
""" """
print_verbose(f"RPUSH to Redis list: key: {key}, values: {values}") _redis_client: Any = self.init_async_client()
return await self._generic_redis_command( start_time = time.time()
"rpush", key, *values, parent_otel_span=parent_otel_span, **kwargs try:
) response = await _redis_client.rpush(key, *values)
## LOGGING ##
end_time = time.time()
_duration = end_time - start_time
asyncio.create_task(
self.service_logger_obj.async_service_success_hook(
service=ServiceTypes.REDIS,
duration=_duration,
call_type="async_rpush",
)
)
return response
except Exception as e:
# NON blocking - notify users Redis is throwing an exception
## LOGGING ##
end_time = time.time()
_duration = end_time - start_time
asyncio.create_task(
self.service_logger_obj.async_service_failure_hook(
service=ServiceTypes.REDIS,
duration=_duration,
error=e,
call_type="async_rpush",
)
)
verbose_logger.error(
f"LiteLLM Redis Cache RPUSH: - Got exception from REDIS : {str(e)}"
)
raise e
async def async_lpop( async def async_lpop(
self, self,
@ -1146,37 +1104,50 @@ class RedisCache(BaseCache):
parent_otel_span: Optional[Span] = None, parent_otel_span: Optional[Span] = None,
**kwargs, **kwargs,
) -> Union[Any, List[Any]]: ) -> Union[Any, List[Any]]:
""" _redis_client: Any = self.init_async_client()
Remove and return the first element(s) of a list stored at key start_time = time.time()
Args:
key: The Redis key of the list
count: Number of elements to pop (if None, pops a single element)
parent_otel_span: Optional parent OpenTelemetry span
Returns:
Union[Any, List[Any]]: The popped value(s) or None if list is empty
"""
print_verbose(f"LPOP from Redis list: key: {key}, count: {count}") print_verbose(f"LPOP from Redis list: key: {key}, count: {count}")
args = [count] if count is not None else [] try:
result = await _redis_client.lpop(key, count)
## LOGGING ##
end_time = time.time()
_duration = end_time - start_time
asyncio.create_task(
self.service_logger_obj.async_service_success_hook(
service=ServiceTypes.REDIS,
duration=_duration,
call_type="async_lpop",
)
)
# Fix: Pass command as a positional argument, not as a keyword argument # Handle result parsing if needed
result = await self._generic_redis_command( if isinstance(result, bytes):
"lpop", key, *args, parent_otel_span=parent_otel_span, **kwargs try:
) return result.decode("utf-8")
except Exception:
# Handle result parsing if needed return result
if isinstance(result, bytes): elif isinstance(result, list) and all(
try: isinstance(item, bytes) for item in result
return result.decode("utf-8") ):
except Exception: try:
return result return [item.decode("utf-8") for item in result]
elif isinstance(result, list) and all( except Exception:
isinstance(item, bytes) for item in result return result
): return result
try: except Exception as e:
return [item.decode("utf-8") for item in result] # NON blocking - notify users Redis is throwing an exception
except Exception: ## LOGGING ##
return result end_time = time.time()
_duration = end_time - start_time
return result asyncio.create_task(
self.service_logger_obj.async_service_failure_hook(
service=ServiceTypes.REDIS,
duration=_duration,
error=e,
call_type="async_lpop",
)
)
verbose_logger.error(
f"LiteLLM Redis Cache LPOP: - Got exception from REDIS : {str(e)}"
)
raise e