forked from phoenix/litellm-mirror
add redis async_increment_pipeline
This commit is contained in:
parent
8f74da6438
commit
5dd8726685
1 changed files with 90 additions and 0 deletions
|
@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Tuple
|
|||
import litellm
|
||||
from litellm._logging import print_verbose, verbose_logger
|
||||
from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
|
||||
from litellm.types.caching import RedisPipelineIncrementOperation
|
||||
from litellm.types.services import ServiceLoggerPayload, ServiceTypes
|
||||
from litellm.types.utils import all_litellm_params
|
||||
|
||||
|
@ -890,3 +891,92 @@ class RedisCache(BaseCache):
|
|||
|
||||
def delete_cache(self, key):
|
||||
self.redis_client.delete(key)
|
||||
|
||||
async def _pipeline_increment_helper(
|
||||
self,
|
||||
pipe: pipeline,
|
||||
increment_list: List[RedisPipelineIncrementOperation],
|
||||
) -> Optional[List[float]]:
|
||||
"""Helper function for pipeline increment operations"""
|
||||
# Iterate through each increment operation and add commands to pipeline
|
||||
for increment_op in increment_list:
|
||||
cache_key = self.check_and_fix_namespace(key=increment_op["key"])
|
||||
print_verbose(
|
||||
f"Increment ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {increment_op['increment_value']}\nttl={increment_op['ttl_seconds']}"
|
||||
)
|
||||
pipe.incrbyfloat(cache_key, increment_op["increment_value"])
|
||||
if increment_op["ttl_seconds"] is not None:
|
||||
_td = timedelta(seconds=increment_op["ttl_seconds"])
|
||||
pipe.expire(cache_key, _td)
|
||||
# Execute the pipeline and return results
|
||||
results = await pipe.execute()
|
||||
print_verbose(f"Increment ASYNC Redis Cache PIPELINE: results: {results}")
|
||||
return results
|
||||
|
||||
async def async_increment_pipeline(
|
||||
self, increment_list: List[RedisPipelineIncrementOperation], **kwargs
|
||||
) -> Optional[List[float]]:
|
||||
"""
|
||||
Use Redis Pipelines for bulk increment operations
|
||||
Args:
|
||||
increment_list: List of RedisPipelineIncrementOperation dicts containing:
|
||||
- key: str
|
||||
- increment_value: float
|
||||
- ttl_seconds: int
|
||||
"""
|
||||
# don't waste a network request if there's nothing to increment
|
||||
if len(increment_list) == 0:
|
||||
return
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
_redis_client: Redis = self.init_async_client() # type: ignore
|
||||
start_time = time.time()
|
||||
|
||||
print_verbose(
|
||||
f"Increment Async Redis Cache Pipeline: increment list: {increment_list}"
|
||||
)
|
||||
|
||||
try:
|
||||
async with _redis_client as redis_client:
|
||||
async with redis_client.pipeline(transaction=True) as pipe:
|
||||
results = await self._pipeline_increment_helper(
|
||||
pipe, increment_list
|
||||
)
|
||||
|
||||
print_verbose(f"pipeline increment results: {results}")
|
||||
|
||||
## LOGGING ##
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_success_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
call_type="async_increment_pipeline",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
|
||||
)
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
## LOGGING ##
|
||||
end_time = time.time()
|
||||
_duration = end_time - start_time
|
||||
asyncio.create_task(
|
||||
self.service_logger_obj.async_service_failure_hook(
|
||||
service=ServiceTypes.REDIS,
|
||||
duration=_duration,
|
||||
error=e,
|
||||
call_type="async_increment_pipeline",
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
|
||||
)
|
||||
)
|
||||
verbose_logger.error(
|
||||
"LiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s",
|
||||
str(e),
|
||||
)
|
||||
raise e
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue