(refactor) caching - use _sync_set_cache (#6224)

* caching - use _sync_set_cache

* add sync _sync_add_streaming_response_to_cache

* use caching class for cache storage
This commit is contained in:
Ishaan Jaff 2024-10-16 10:38:07 +05:30 committed by GitHub
parent a04fc1a921
commit 4eea0652eb
3 changed files with 89 additions and 28 deletions

View file

@ -67,6 +67,7 @@ class LLMCachingHandler:
start_time: datetime.datetime,
):
self.async_streaming_chunks: List[ModelResponse] = []
self.sync_streaming_chunks: List[ModelResponse] = []
self.request_kwargs = request_kwargs
self.original_function = original_function
self.start_time = start_time
@ -470,12 +471,11 @@ class LLMCachingHandler:
None
"""
args = args or ()
if litellm.cache is None:
return
# [OPTIONAL] ADD TO CACHE
if (
(litellm.cache is not None)
and litellm.cache.supported_call_types is not None
and (str(original_function.__name__) in litellm.cache.supported_call_types)
and (kwargs.get("cache", {}).get("no-store", False) is not True)
if self._should_store_result_in_cache(
original_function=original_function, kwargs=kwargs
):
if (
isinstance(result, litellm.ModelResponse)
@ -509,6 +509,42 @@ class LLMCachingHandler:
litellm.cache.async_add_cache(result, *args, **kwargs)
)
def _sync_set_cache(
self,
result: Any,
kwargs: Dict[str, Any],
args: Optional[Tuple[Any, ...]] = None,
):
"""
Sync internal method to add the result to the cache
"""
if litellm.cache is None:
return
args = args or ()
if self._should_store_result_in_cache(
original_function=self.original_function, kwargs=kwargs
):
litellm.cache.add_cache(result, *args, **kwargs)
return
def _should_store_result_in_cache(
self, original_function: Callable, kwargs: Dict[str, Any]
) -> bool:
"""
Helper function to determine if the result should be stored in the cache.
Returns:
bool: True if the result should be stored in the cache, False otherwise.
"""
return (
(litellm.cache is not None)
and litellm.cache.supported_call_types is not None
and (str(original_function.__name__) in litellm.cache.supported_call_types)
and (kwargs.get("cache", {}).get("no-store", False) is not True)
)
async def _add_streaming_response_to_cache(self, processed_chunk: ModelResponse):
"""
Internal method to add the streaming response to the cache
@ -536,3 +572,25 @@ class LLMCachingHandler:
original_function=self.original_function,
kwargs=self.request_kwargs,
)
def _sync_add_streaming_response_to_cache(self, processed_chunk: ModelResponse):
"""
Sync internal method to add the streaming response to the cache
"""
complete_streaming_response: Optional[
Union[ModelResponse, TextCompletionResponse]
] = _assemble_complete_response_from_streaming_chunks(
result=processed_chunk,
start_time=self.start_time,
end_time=datetime.datetime.now(),
request_kwargs=self.request_kwargs,
streaming_chunks=self.sync_streaming_chunks,
is_async=False,
)
# if a complete_streaming_response is assembled, add it to the cache
if complete_streaming_response is not None:
self._sync_set_cache(
result=complete_streaming_response,
kwargs=self.request_kwargs,
)