fix(utils): adds complete streaming response to success handler

This commit is contained in:
Krrish Dholakia 2023-10-07 15:37:31 -07:00
parent f941975a78
commit 9cda24e1b2
7 changed files with 161 additions and 97 deletions

View file

@ -1478,12 +1478,13 @@ def config_completion(**kwargs):
) )
def stream_chunk_builder(chunks: list): def stream_chunk_builder(chunks: list):
print(f"chunk 0: {chunks[0]}")
id = chunks[0]["id"] id = chunks[0]["id"]
object = chunks[0]["object"] object = chunks[0]["object"]
created = chunks[0]["created"] created = chunks[0]["created"]
model = chunks[0]["model"] model = chunks[0]["model"]
role = chunks[0]["choices"][0]["delta"]["role"] role = chunks[0]["choices"][0]["delta"]["role"]
finnish_reason = chunks[-1]["choices"][0]["finish_reason"] finish_reason = chunks[-1]["choices"][0]["finish_reason"]
# Initialize the response dictionary # Initialize the response dictionary
response = { response = {
@ -1498,7 +1499,7 @@ def stream_chunk_builder(chunks: list):
"role": role, "role": role,
"content": "" "content": ""
}, },
"finish_reason": finnish_reason, "finish_reason": finish_reason,
} }
], ],
# "usage": { # "usage": {

View file

@ -41,118 +41,163 @@ messages = [{"content": user_message, "role": "user"}]
# 1. On Call Success # 1. On Call Success
# normal completion # normal completion
## test on openai completion call ## test on openai completion call
try: def test_logging_success_completion():
# Redirect stdout global score
old_stdout = sys.stdout try:
sys.stdout = new_stdout = io.StringIO() # Redirect stdout
old_stdout = sys.stdout
sys.stdout = new_stdout = io.StringIO()
response = completion(model="gpt-3.5-turbo", messages=messages) response = completion(model="gpt-3.5-turbo", messages=messages)
# Restore stdout # Restore stdout
sys.stdout = old_stdout sys.stdout = old_stdout
output = new_stdout.getvalue().strip() output = new_stdout.getvalue().strip()
if "Logging Details Pre-API Call" not in output: if "Logging Details Pre-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details Post-API Call" not in output: elif "Logging Details Post-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output: elif "Logging Details LiteLLM-Success Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
score += 1 score += 1
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
pass pass
## test on non-openai completion call ## test on non-openai completion call
try: def test_logging_success_completion_non_openai():
# Redirect stdout global score
old_stdout = sys.stdout try:
sys.stdout = new_stdout = io.StringIO() # Redirect stdout
old_stdout = sys.stdout
sys.stdout = new_stdout = io.StringIO()
response = completion(model="claude-instant-1", messages=messages) response = completion(model="claude-instant-1", messages=messages)
# Restore stdout # Restore stdout
sys.stdout = old_stdout sys.stdout = old_stdout
output = new_stdout.getvalue().strip() output = new_stdout.getvalue().strip()
if "Logging Details Pre-API Call" not in output: if "Logging Details Pre-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details Post-API Call" not in output: elif "Logging Details Post-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output: elif "Logging Details LiteLLM-Success Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
score += 1 score += 1
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
pass pass
# streaming completion # streaming completion
## test on openai completion call ## test on openai completion call
try: def test_logging_success_streaming_openai():
# Redirect stdout global score
old_stdout = sys.stdout try:
sys.stdout = new_stdout = io.StringIO() # litellm.set_verbose = False
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
if "complete_streaming_response" in kwargs:
print(f"Complete Streaming Response: {kwargs['complete_streaming_response']}")
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = completion(model="gpt-3.5-turbo", messages=messages) # Redirect stdout
old_stdout = sys.stdout
sys.stdout = new_stdout = io.StringIO()
# Restore stdout response = completion(model="gpt-3.5-turbo", messages=messages, stream=True)
sys.stdout = old_stdout for chunk in response:
output = new_stdout.getvalue().strip() pass
if "Logging Details Pre-API Call" not in output: # Restore stdout
raise Exception("Required log message not found!") sys.stdout = old_stdout
elif "Logging Details Post-API Call" not in output: output = new_stdout.getvalue().strip()
raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output: if "Logging Details Pre-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
score += 1 elif "Logging Details Post-API Call" not in output:
except Exception as e: raise Exception("Required log message not found!")
pytest.fail(f"Error occurred: {e}") elif "Logging Details LiteLLM-Success Call" not in output:
pass raise Exception("Required log message not found!")
elif "Complete Streaming Response:" not in output:
raise Exception("Required log message not found!")
score += 1
except Exception as e:
pytest.fail(f"Error occurred: {e}")
pass
# test_logging_success_streaming_openai()
## test on non-openai completion call ## test on non-openai completion call
try: def test_logging_success_streaming_non_openai():
# Redirect stdout global score
old_stdout = sys.stdout try:
sys.stdout = new_stdout = io.StringIO() # litellm.set_verbose = False
def custom_callback(
kwargs, # kwargs to completion
completion_response, # response from completion
start_time, end_time # start/end time
):
# print(f"streaming response: {completion_response}")
if "complete_streaming_response" in kwargs:
print(f"Complete Streaming Response: {kwargs['complete_streaming_response']}")
# Assign the custom callback function
litellm.success_callback = [custom_callback]
response = completion(model="claude-instant-1", messages=messages) # Redirect stdout
old_stdout = sys.stdout
# Restore stdout sys.stdout = new_stdout = io.StringIO()
sys.stdout = old_stdout
output = new_stdout.getvalue().strip()
if "Logging Details Pre-API Call" not in output: response = completion(model="claude-instant-1", messages=messages, stream=True)
raise Exception("Required log message not found!") for idx, chunk in enumerate(response):
elif "Logging Details Post-API Call" not in output: pass
raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output: # Restore stdout
raise Exception("Required log message not found!") sys.stdout = old_stdout
score += 1 output = new_stdout.getvalue().strip()
except Exception as e:
pytest.fail(f"Error occurred: {e}")
pass
if "Logging Details Pre-API Call" not in output:
raise Exception("Required log message not found!")
elif "Logging Details Post-API Call" not in output:
raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output:
raise Exception("Required log message not found!")
elif "Complete Streaming Response:" not in output:
raise Exception("Required log message not found!")
score += 1
except Exception as e:
pytest.fail(f"Error occurred: {e}")
pass
test_logging_success_streaming_non_openai()
# embedding # embedding
try: def test_logging_success_embedding_openai():
# Redirect stdout try:
old_stdout = sys.stdout # Redirect stdout
sys.stdout = new_stdout = io.StringIO() old_stdout = sys.stdout
sys.stdout = new_stdout = io.StringIO()
response = embedding(model="text-embedding-ada-002", input=["good morning from litellm"]) response = embedding(model="text-embedding-ada-002", input=["good morning from litellm"])
# Restore stdout # Restore stdout
sys.stdout = old_stdout sys.stdout = old_stdout
output = new_stdout.getvalue().strip() output = new_stdout.getvalue().strip()
if "Logging Details Pre-API Call" not in output: if "Logging Details Pre-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details Post-API Call" not in output: elif "Logging Details Post-API Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
elif "Logging Details LiteLLM-Success Call" not in output: elif "Logging Details LiteLLM-Success Call" not in output:
raise Exception("Required log message not found!") raise Exception("Required log message not found!")
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
# ## 2. On LiteLLM Call failure # ## 2. On LiteLLM Call failure
# ## TEST BAD KEY # ## TEST BAD KEY

View file

@ -54,4 +54,5 @@ def test_stream_chunk_builder():
finnish_reason = choices["finish_reason"] finnish_reason = choices["finish_reason"]
except: except:
raise Exception("stream_chunk_builder failed to rebuild response") raise Exception("stream_chunk_builder failed to rebuild response")
test_stream_chunk_builder() # test_stream_chunk_builder()

View file

@ -228,6 +228,7 @@ class Logging:
self.call_type = call_type self.call_type = call_type
self.litellm_call_id = litellm_call_id self.litellm_call_id = litellm_call_id
self.function_id = function_id self.function_id = function_id
self.streaming_chunks = [] # for generating complete stream response
def update_environment_variables(self, model, user, optional_params, litellm_params): def update_environment_variables(self, model, user, optional_params, litellm_params):
self.optional_params = optional_params self.optional_params = optional_params
@ -394,7 +395,7 @@ class Logging:
pass pass
def success_handler(self, result, start_time=None, end_time=None): def success_handler(self, result=None, start_time=None, end_time=None, **kwargs):
print_verbose( print_verbose(
f"Logging Details LiteLLM-Success Call" f"Logging Details LiteLLM-Success Call"
) )
@ -403,6 +404,20 @@ class Logging:
start_time = self.start_time start_time = self.start_time
if end_time is None: if end_time is None:
end_time = datetime.datetime.now() end_time = datetime.datetime.now()
complete_streaming_response = None
## BUILD COMPLETE STREAMED RESPONSE
if self.stream:
if result.choices[0].finish_reason: # if it's the last chunk
self.streaming_chunks.append(result)
complete_streaming_response = litellm.stream_chunk_builder(self.streaming_chunks)
else:
self.streaming_chunks.append(result)
if complete_streaming_response:
self.model_call_details["complete_streaming_response"] = complete_streaming_response
print_verbose(f"success callbacks: {litellm.success_callback}") print_verbose(f"success callbacks: {litellm.success_callback}")
if litellm.max_budget and self.stream: if litellm.max_budget and self.stream:
@ -3328,20 +3343,22 @@ class CustomStreamWrapper:
chunk = next(self.completion_stream) chunk = next(self.completion_stream)
model_response = chunk model_response = chunk
# LOGGING # LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(completion_obj,)).start() threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response return model_response
# LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(completion_obj,)).start()
model_response.model = self.model model_response.model = self.model
if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string if len(completion_obj["content"]) > 0: # cannot set content of an OpenAI Object to be an empty string
if self.sent_first_chunk == False: if self.sent_first_chunk == False:
completion_obj["role"] = "assistant" completion_obj["role"] = "assistant"
self.sent_first_chunk = True self.sent_first_chunk = True
model_response.choices[0].delta = Delta(**completion_obj) model_response.choices[0].delta = Delta(**completion_obj)
# LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response return model_response
elif model_response.choices[0].finish_reason: elif model_response.choices[0].finish_reason:
model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai model_response.choices[0].finish_reason = map_finish_reason(model_response.choices[0].finish_reason) # ensure consistent output to openai
# LOGGING
threading.Thread(target=self.logging_obj.success_handler, args=(model_response,)).start()
return model_response return model_response
except StopIteration: except StopIteration:
raise StopIteration raise StopIteration