mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 19:54:13 +00:00
(feat) batch write redis cache output
This commit is contained in:
parent
21b9a0ead3
commit
ec0435bdea
1 changed files with 15 additions and 9 deletions
|
@ -203,16 +203,17 @@ class RedisCache(BaseCache):
|
||||||
print_verbose(f"Error occurred in pipeline write - {str(e)}")
|
print_verbose(f"Error occurred in pipeline write - {str(e)}")
|
||||||
|
|
||||||
async def batch_cache_write(self, key, value, **kwargs):
|
async def batch_cache_write(self, key, value, **kwargs):
|
||||||
print_verbose("in batch cache writing for redis")
|
print_verbose(
|
||||||
|
"in batch cache writing for redis buffer size=",
|
||||||
|
len(self.redis_batch_writing_buffer),
|
||||||
|
)
|
||||||
self.redis_batch_writing_buffer.append((key, value))
|
self.redis_batch_writing_buffer.append((key, value))
|
||||||
if len(self.redis_batch_writing_buffer) >= self.redis_flush_size:
|
if len(self.redis_batch_writing_buffer) >= self.redis_flush_size:
|
||||||
await self.flush_cache_buffer()
|
await self.flush_cache_buffer()
|
||||||
|
|
||||||
async def flush_cache_buffer(self):
|
async def flush_cache_buffer(self):
|
||||||
print_verbose(
|
print_verbose(
|
||||||
"flushing to redis....reached size of buffer",
|
f"flushing to redis....reached size of buffer {len(self.redis_batch_writing_buffer)}"
|
||||||
len(self.redis_batch_writing_buffer),
|
|
||||||
)
|
)
|
||||||
await self.async_set_cache_pipeline(self.redis_batch_writing_buffer)
|
await self.async_set_cache_pipeline(self.redis_batch_writing_buffer)
|
||||||
self.redis_batch_writing_buffer = []
|
self.redis_batch_writing_buffer = []
|
||||||
|
@ -932,7 +933,7 @@ class Cache:
|
||||||
s3_path: Optional[str] = None,
|
s3_path: Optional[str] = None,
|
||||||
redis_semantic_cache_use_async=False,
|
redis_semantic_cache_use_async=False,
|
||||||
redis_semantic_cache_embedding_model="text-embedding-ada-002",
|
redis_semantic_cache_embedding_model="text-embedding-ada-002",
|
||||||
redis_flush_size=100,
|
redis_flush_size=None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
@ -994,6 +995,7 @@ class Cache:
|
||||||
self.supported_call_types = supported_call_types # default to ["completion", "acompletion", "embedding", "aembedding"]
|
self.supported_call_types = supported_call_types # default to ["completion", "acompletion", "embedding", "aembedding"]
|
||||||
self.type = type
|
self.type = type
|
||||||
self.namespace = namespace
|
self.namespace = namespace
|
||||||
|
self.redis_flush_size = redis_flush_size
|
||||||
|
|
||||||
def get_cache_key(self, *args, **kwargs):
|
def get_cache_key(self, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
@ -1273,6 +1275,10 @@ class Cache:
|
||||||
Async implementation of add_cache
|
Async implementation of add_cache
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
if self.type == "redis" and self.redis_flush_size is not None:
|
||||||
|
# high traffic - fill in results in memory and then flush
|
||||||
|
await self.batch_cache_write(result, *args, **kwargs)
|
||||||
|
else:
|
||||||
cache_key, cached_data, kwargs = self._add_cache_logic(
|
cache_key, cached_data, kwargs = self._add_cache_logic(
|
||||||
result=result, *args, **kwargs
|
result=result, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue