mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
add async_rpush
This commit is contained in:
parent
cec5280e03
commit
e5e0ffa28a
1 changed files with 135 additions and 0 deletions
|
@ -1045,3 +1045,138 @@ class RedisCache(BaseCache):
|
|||
except Exception as e:
|
||||
verbose_logger.debug(f"Redis TTL Error: {e}")
|
||||
return None
|
||||
|
||||
async def _generic_redis_command(
|
||||
self,
|
||||
command: str,
|
||||
key: str,
|
||||
*args,
|
||||
parent_otel_span: Optional[Span] = None,
|
||||
**kwargs,
|
||||
) -> Any:
|
||||
"""
|
||||
Generic method to execute any Redis command
|
||||
|
||||
Args:
|
||||
command: The Redis command to execute (e.g. 'rpush', 'lpop')
|
||||
key: The Redis key to operate on
|
||||
args: Additional arguments to pass to the Redis command
|
||||
parent_otel_span: Optional parent OpenTelemetry span
|
||||
kwargs: Additional keyword arguments
|
||||
|
||||
Returns:
|
||||
Any: The result of the Redis command
|
||||
"""
|
||||
_redis_client = self.init_async_client()
|
||||
key = self.check_and_fix_namespace(key=key)
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Get the method from the Redis client
|
||||
redis_method = getattr(_redis_client, command)
|
||||
if not redis_method:
|
||||
raise AttributeError(f"Redis client has no method named '{command}'")
|
||||
|
||||
# Execute the command
|
||||
result = await redis_method(key, *args)
|
||||
|
||||
# Log success
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_success_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
call_type=f"async_{command}",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=parent_otel_span,
|
||||
event_metadata={"key": key},
|
||||
)
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
# Log failure
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_failure_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
error=e,
|
||||
call_type=f"async_{command}",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=parent_otel_span,
|
||||
event_metadata={"key": key},
|
||||
)
|
||||
)
|
||||
verbose_logger.error(
|
||||
f"LiteLLM Redis Caching: async {command}() - Got exception from REDIS: {str(e)}"
|
||||
)
|
||||
raise e
|
||||
|
||||
async def async_rpush(
|
||||
self,
|
||||
key: str,
|
||||
values: List[Any],
|
||||
parent_otel_span: Optional[Span] = None,
|
||||
**kwargs,
|
||||
) -> int:
|
||||
"""
|
||||
Append one or multiple values to a list stored at key
|
||||
|
||||
Args:
|
||||
key: The Redis key of the list
|
||||
values: One or more values to append to the list
|
||||
parent_otel_span: Optional parent OpenTelemetry span
|
||||
|
||||
Returns:
|
||||
int: The length of the list after the push operation
|
||||
"""
|
||||
print_verbose(f"RPUSH to Redis list: key: {key}, values: {values}")
|
||||
return await self._generic_redis_command(
|
||||
"rpush", key, *values, parent_otel_span=parent_otel_span, **kwargs
|
||||
)
|
||||
|
||||
async def async_lpop(
|
||||
self,
|
||||
key: str,
|
||||
count: Optional[int] = None,
|
||||
parent_otel_span: Optional[Span] = None,
|
||||
**kwargs,
|
||||
) -> Union[Any, List[Any]]:
|
||||
"""
|
||||
Remove and return the first element(s) of a list stored at key
|
||||
|
||||
Args:
|
||||
key: The Redis key of the list
|
||||
count: Number of elements to pop (if None, pops a single element)
|
||||
parent_otel_span: Optional parent OpenTelemetry span
|
||||
|
||||
Returns:
|
||||
Union[Any, List[Any]]: The popped value(s) or None if list is empty
|
||||
"""
|
||||
print_verbose(f"LPOP from Redis list: key: {key}, count: {count}")
|
||||
args = [count] if count is not None else []
|
||||
|
||||
# Fix: Pass command as a positional argument, not as a keyword argument
|
||||
result = await self._generic_redis_command(
|
||||
"lpop", key, *args, parent_otel_span=parent_otel_span, **kwargs
|
||||
)
|
||||
|
||||
# Handle result parsing if needed
|
||||
if isinstance(result, bytes):
|
||||
try:
|
||||
return result.decode("utf-8")
|
||||
except Exception:
|
||||
return result
|
||||
elif isinstance(result, list) and all(
|
||||
isinstance(item, bytes) for item in result
|
||||
):
|
||||
try:
|
||||
return [item.decode("utf-8") for item in result]
|
||||
except Exception:
|
||||
return result
|
||||
|
||||
return result
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue