mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 02:34:29 +00:00
add streaming_caching support
This commit is contained in:
parent
8af6d967eb
commit
fbef73d043
3 changed files with 48 additions and 44 deletions
|
@ -1,4 +1,5 @@
|
|||
import litellm
|
||||
import time
|
||||
def get_prompt(*args, **kwargs):
|
||||
# make this safe checks, it should not throw any exceptions
|
||||
if len(args) > 1:
|
||||
|
@ -30,9 +31,9 @@ class InMemoryCache():
|
|||
self.cache_dict = {}
|
||||
|
||||
def set_cache(self, key, value):
|
||||
print("in set cache for inmem")
|
||||
#print("in set cache for inmem")
|
||||
self.cache_dict[key] = value
|
||||
print(self.cache_dict)
|
||||
#print(self.cache_dict)
|
||||
|
||||
def get_cache(self, key):
|
||||
#print("in get cache for inmem")
|
||||
|
@ -65,11 +66,23 @@ class Cache():
|
|||
return None
|
||||
return cache_key
|
||||
|
||||
def generate_streaming_content(self, content):
|
||||
chunk_size = 5 # Adjust the chunk size as needed
|
||||
for i in range(0, len(content), chunk_size):
|
||||
yield {'choices': [{'delta': {'role': 'assistant', 'content': content[i:i+chunk_size]}}]}
|
||||
time.sleep(0.02)
|
||||
|
||||
def get_cache(self, *args, **kwargs):
|
||||
try: # never block execution
|
||||
cache_key = self.get_cache_key(*args, **kwargs)
|
||||
if cache_key is not None:
|
||||
return self.cache.get_cache(cache_key)
|
||||
cached_result = self.cache.get_cache(cache_key)
|
||||
if cached_result != None and 'stream' in kwargs and kwargs['stream'] == True:
|
||||
# if streaming is true and we got a cache hit, return a generator
|
||||
#print("cache hit and stream=True")
|
||||
#print(cached_result)
|
||||
return self.generate_streaming_content(cached_result["choices"][0]['message']['content'])
|
||||
return cached_result
|
||||
except:
|
||||
return None
|
||||
|
||||
|
|
|
@ -152,35 +152,32 @@ def test_embedding_caching():
|
|||
# test_embedding_caching()
|
||||
|
||||
|
||||
# # test caching with streaming
|
||||
# messages = [{"role": "user", "content": "hello gm who are u"}]
|
||||
# def test_caching_v2_stream():
|
||||
# try:
|
||||
# litellm.cache = Cache()
|
||||
# # litellm.token="ishaan@berri.ai"
|
||||
# response1 = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
|
||||
# for chunk in response1:
|
||||
# #
|
||||
# pass
|
||||
# # print("chunk")
|
||||
# pass
|
||||
# # response1_id = chunk['id']
|
||||
# test caching with streaming
|
||||
messages = [{"role": "user", "content": "tell me a story in 2 sentences"}]
|
||||
def test_caching_v2_stream():
|
||||
try:
|
||||
litellm.cache = Cache()
|
||||
# litellm.token="ishaan@berri.ai"
|
||||
response1 = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
|
||||
result_string = ""
|
||||
for chunk in response1:
|
||||
print(chunk)
|
||||
result_string+=chunk['choices'][0]['delta']['content']
|
||||
# response1_id = chunk['id']
|
||||
|
||||
# # response2 = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
|
||||
# # for chunk in response2:
|
||||
# # #print(chunk)
|
||||
# # response2_id = chunk['id']
|
||||
|
||||
# # print(f"response1: {response1}")
|
||||
# # print(f"response2: {response2}")
|
||||
# # litellm.cache = None # disable cache
|
||||
# # if response2_id != response1_id:
|
||||
# # print(f"response1: {response1_id}")
|
||||
# # print(f"response2: {response2_id}")
|
||||
# # pytest.fail(f"Error occurred: {e}")
|
||||
# except Exception as e:
|
||||
# print(f"error occurred: {traceback.format_exc()}")
|
||||
# pytest.fail(f"Error occurred: {e}")
|
||||
result2_string=""
|
||||
response2 = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
|
||||
for chunk in response2:
|
||||
print(chunk)
|
||||
result2_string+=chunk['choices'][0]['delta']['content']
|
||||
if result_string != result2_string:
|
||||
print(result_string)
|
||||
print(result2_string)
|
||||
pytest.fail(f"Error occurred: Caching with streaming failed, strings diff")
|
||||
|
||||
except Exception as e:
|
||||
print(f"error occurred: {traceback.format_exc()}")
|
||||
pytest.fail(f"Error occurred: {e}")
|
||||
|
||||
# test_caching_v2_stream()
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ last_fetched_at_keys = None
|
|||
|
||||
|
||||
class Message(OpenAIObject):
|
||||
def __init__(self, content=" ", role="assistant", **params):
|
||||
def __init__(self, content="default", role="assistant", **params):
|
||||
super(Message, self).__init__(**params)
|
||||
self.content = content
|
||||
self.role = role
|
||||
|
@ -287,24 +287,18 @@ class Logging:
|
|||
)
|
||||
if callback == "cache":
|
||||
try:
|
||||
#print("in cache callback2", self.stream)
|
||||
#print(original_response)
|
||||
#print(self.model_call_details)
|
||||
|
||||
if litellm.cache != None:
|
||||
if litellm.cache != None and self.model_call_details.get('optional_params', {}).get('stream', False) == True:
|
||||
if self.litellm_params["stream_response"] == None:
|
||||
self.litellm_params["stream_response"] = ModelResponse()
|
||||
else:
|
||||
#self.litellm_call_id["stream_response"]["id"] = self.litellm_params["litellm_call_id"]
|
||||
self.litellm_params["stream_response"]["choices"][0]["message"]["content"] += original_response
|
||||
#print("cache is not none")
|
||||
# convert original_response to format of Model Object
|
||||
# Set the model
|
||||
if self.litellm_params["stream_response"]["choices"][0]["message"]["content"] == "default":
|
||||
self.litellm_params["stream_response"]["choices"][0]["message"]["content"] = original_response # handle first try
|
||||
else:
|
||||
self.litellm_params["stream_response"]["choices"][0]["message"]["content"] += original_response
|
||||
litellm.cache.add_cache(self.litellm_params["stream_response"], **self.model_call_details)
|
||||
#print(self.litellm_params["stream_response"])
|
||||
except Exception as e:
|
||||
print("got exception")
|
||||
print(e)
|
||||
pass
|
||||
except:
|
||||
print_verbose(
|
||||
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {traceback.format_exc()}"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue