diff --git a/litellm/caching.py b/litellm/caching.py index 923a3031b..7c04bab72 100644 --- a/litellm/caching.py +++ b/litellm/caching.py @@ -1,4 +1,5 @@ import litellm +import time def get_prompt(*args, **kwargs): # make this safe checks, it should not throw any exceptions if len(args) > 1: @@ -30,9 +31,9 @@ class InMemoryCache(): self.cache_dict = {} def set_cache(self, key, value): - print("in set cache for inmem") + #print("in set cache for inmem") self.cache_dict[key] = value - print(self.cache_dict) + #print(self.cache_dict) def get_cache(self, key): #print("in get cache for inmem") @@ -65,11 +66,23 @@ class Cache(): return None return cache_key + def generate_streaming_content(self, content): + chunk_size = 5 # Adjust the chunk size as needed + for i in range(0, len(content), chunk_size): + yield {'choices': [{'delta': {'role': 'assistant', 'content': content[i:i+chunk_size]}}]} + time.sleep(0.02) + def get_cache(self, *args, **kwargs): try: # never block execution cache_key = self.get_cache_key(*args, **kwargs) if cache_key is not None: - return self.cache.get_cache(cache_key) + cached_result = self.cache.get_cache(cache_key) + if cached_result != None and 'stream' in kwargs and kwargs['stream'] == True: + # if streaming is true and we got a cache hit, return a generator + #print("cache hit and stream=True") + #print(cached_result) + return self.generate_streaming_content(cached_result["choices"][0]['message']['content']) + return cached_result except: return None diff --git a/litellm/tests/test_caching.py b/litellm/tests/test_caching.py index 949ae0c26..580c6d86b 100644 --- a/litellm/tests/test_caching.py +++ b/litellm/tests/test_caching.py @@ -152,35 +152,32 @@ def test_embedding_caching(): # test_embedding_caching() -# # test caching with streaming -# messages = [{"role": "user", "content": "hello gm who are u"}] -# def test_caching_v2_stream(): -# try: -# litellm.cache = Cache() -# # litellm.token="ishaan@berri.ai" -# response1 = completion(model="gpt-3.5-turbo", messages=messages, stream=True) -# for chunk in response1: -# # -# pass -# # print("chunk") -# pass -# # response1_id = chunk['id'] +# test caching with streaming +messages = [{"role": "user", "content": "tell me a story in 2 sentences"}] +def test_caching_v2_stream(): + try: + litellm.cache = Cache() + # litellm.token="ishaan@berri.ai" + response1 = completion(model="gpt-3.5-turbo", messages=messages, stream=True) + result_string = "" + for chunk in response1: + print(chunk) + result_string+=chunk['choices'][0]['delta']['content'] + # response1_id = chunk['id'] -# # response2 = completion(model="gpt-3.5-turbo", messages=messages, stream=True) -# # for chunk in response2: -# # #print(chunk) -# # response2_id = chunk['id'] - -# # print(f"response1: {response1}") -# # print(f"response2: {response2}") -# # litellm.cache = None # disable cache -# # if response2_id != response1_id: -# # print(f"response1: {response1_id}") -# # print(f"response2: {response2_id}") -# # pytest.fail(f"Error occurred: {e}") -# except Exception as e: -# print(f"error occurred: {traceback.format_exc()}") -# pytest.fail(f"Error occurred: {e}") + result2_string="" + response2 = completion(model="gpt-3.5-turbo", messages=messages, stream=True) + for chunk in response2: + print(chunk) + result2_string+=chunk['choices'][0]['delta']['content'] + if result_string != result2_string: + print(result_string) + print(result2_string) + pytest.fail(f"Error occurred: Caching with streaming failed, strings diff") + + except Exception as e: + print(f"error occurred: {traceback.format_exc()}") + pytest.fail(f"Error occurred: {e}") # test_caching_v2_stream() diff --git a/litellm/utils.py b/litellm/utils.py index 26b37b694..c41610f5c 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -70,7 +70,7 @@ last_fetched_at_keys = None class Message(OpenAIObject): - def __init__(self, content=" ", role="assistant", **params): + def __init__(self, content="default", role="assistant", **params): super(Message, self).__init__(**params) self.content = content self.role = role @@ -287,24 +287,18 @@ class Logging: ) if callback == "cache": try: - #print("in cache callback2", self.stream) - #print(original_response) - #print(self.model_call_details) - - if litellm.cache != None: + if litellm.cache != None and self.model_call_details.get('optional_params', {}).get('stream', False) == True: if self.litellm_params["stream_response"] == None: self.litellm_params["stream_response"] = ModelResponse() else: #self.litellm_call_id["stream_response"]["id"] = self.litellm_params["litellm_call_id"] - self.litellm_params["stream_response"]["choices"][0]["message"]["content"] += original_response - #print("cache is not none") - # convert original_response to format of Model Object - # Set the model + if self.litellm_params["stream_response"]["choices"][0]["message"]["content"] == "default": + self.litellm_params["stream_response"]["choices"][0]["message"]["content"] = original_response # handle first try + else: + self.litellm_params["stream_response"]["choices"][0]["message"]["content"] += original_response litellm.cache.add_cache(self.litellm_params["stream_response"], **self.model_call_details) - #print(self.litellm_params["stream_response"]) except Exception as e: - print("got exception") - print(e) + pass except: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {traceback.format_exc()}"