fix(utils.py): fix streaming logic

This commit is contained in:
Krrish Dholakia 2024-02-26 14:26:58 -08:00
parent 4e608c86c1
commit 4ba18f9932
3 changed files with 87 additions and 31 deletions

View file

@ -3678,6 +3678,7 @@ def stream_chunk_builder(
response["usage"]["total_tokens"] = (
response["usage"]["prompt_tokens"] + response["usage"]["completion_tokens"]
)
return convert_to_model_response_object(
response_object=response,
model_response_object=model_response,

View file

@ -124,11 +124,12 @@ def test_parallel_function_call():
pytest.fail(f"Error occurred: {e}")
test_parallel_function_call()
# test_parallel_function_call()
def test_parallel_function_call_stream():
try:
litellm.set_verbose = True
# Step 1: send the conversation and available functions to the model
messages = [
{
@ -217,4 +218,4 @@ def test_parallel_function_call_stream():
pytest.fail(f"Error occurred: {e}")
test_parallel_function_call_stream()
# test_parallel_function_call_stream()

View file

@ -276,8 +276,7 @@ class Delta(OpenAIObject):
self.content = content
self.role = role
self.function_call = function_call
if tool_calls is not None:
if isinstance(tool_calls, dict):
if tool_calls is not None and isinstance(tool_calls, dict):
self.tool_calls = []
for tool_call in tool_calls:
if tool_call.get("index", None) is None:
@ -8722,6 +8721,9 @@ class CustomStreamWrapper:
):
try:
delta = dict(original_chunk.choices[0].delta)
model_response.system_fingerprint = (
original_chunk.system_fingerprint
)
## AZURE - check if arguments is not None
if (
original_chunk.choices[0].delta.function_call
@ -8762,22 +8764,23 @@ class CustomStreamWrapper:
delta = dict(original_chunk.choices[0].delta)
print_verbose(f"original delta: {delta}")
model_response.choices[0].delta = Delta(**delta)
print_verbose(
f"new delta: {model_response.choices[0].delta}"
)
except Exception as e:
model_response.choices[0].delta = Delta()
else:
return
model_response.system_fingerprint = original_chunk.system_fingerprint
if self.sent_first_chunk == False:
model_response.choices[0].delta["role"] = "assistant"
self.sent_first_chunk = True
print_verbose(
f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}"
)
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
## RETURN ARG
if (
completion_obj["content"] is not None
or response_obj.get("original_chunk", None) is not None
):
hold = False
if completion_obj["content"] is not None:
"content" in completion_obj
and isinstance(completion_obj["content"], str)
and len(completion_obj["content"]) > 0
): # cannot set content of an OpenAI Object to be an empty string
hold, model_response_str = self.check_special_tokens(
chunk=completion_obj["content"],
finish_reason=model_response.choices[0].finish_reason,
@ -8786,8 +8789,39 @@ class CustomStreamWrapper:
f"hold - {hold}, model_response_str - {model_response_str}"
)
if hold is False:
## check if openai/azure chunk
original_chunk = response_obj.get("original_chunk", None)
if original_chunk is None:
if original_chunk:
model_response.id = original_chunk.id
if len(original_chunk.choices) > 0:
try:
delta = dict(original_chunk.choices[0].delta)
print_verbose(f"original delta: {delta}")
model_response.choices[0].delta = Delta(**delta)
except Exception as e:
model_response.choices[0].delta = Delta()
else:
return
model_response.system_fingerprint = (
original_chunk.system_fingerprint
)
print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}")
if self.sent_first_chunk == False:
model_response.choices[0].delta["role"] = "assistant"
self.sent_first_chunk = True
elif self.sent_first_chunk == True and hasattr(
model_response.choices[0].delta, "role"
):
_initial_delta = model_response.choices[
0
].delta.model_dump()
_initial_delta.pop("role", None)
model_response.choices[0].delta = Delta(**_initial_delta)
print_verbose(
f"model_response.choices[0].delta: {model_response.choices[0].delta}"
)
else:
## else
completion_obj["content"] = model_response_str
if self.sent_first_chunk == False:
completion_obj["role"] = "assistant"
@ -8812,6 +8846,14 @@ class CustomStreamWrapper:
model_response.choices[0].finish_reason
) # ensure consistent output to openai
return model_response
elif (
model_response.choices[0].delta.tool_calls is not None
or model_response.choices[0].delta.function_call is not None
):
if self.sent_first_chunk == False:
model_response.choices[0].delta["role"] = "assistant"
self.sent_first_chunk = True
return model_response
else:
return
except StopIteration:
@ -8860,7 +8902,14 @@ class CustomStreamWrapper:
print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}")
response: Optional[ModelResponse] = self.chunk_creator(chunk=chunk)
print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}")
if response is None:
if response is None or (
isinstance(response, ModelResponse)
and isinstance(response.choices[0], StreamingChoices)
and response.choices[0].delta.content is None
and response.choices[0].delta.function_call is None
and response.choices[0].delta.tool_calls is None
):
continue
## LOGGING
threading.Thread(
@ -8904,7 +8953,11 @@ class CustomStreamWrapper:
print_verbose(f"value of async chunk: {chunk}")
if chunk == "None" or chunk is None:
raise Exception
elif self.custom_llm_provider == "gemini" and len(chunk.parts) == 0:
elif (
self.custom_llm_provider == "gemini"
and hasattr(chunk, "parts")
and len(chunk.parts) == 0
):
continue
# chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks.
# __anext__ also calls async_success_handler, which does logging
@ -8933,6 +8986,7 @@ class CustomStreamWrapper:
self.rules.post_call_rules(
input=self.response_uptil_now, model=self.model
)
print_verbose(f"final returned processed chunk: {processed_chunk}")
return processed_chunk
raise StopAsyncIteration
else: # temporary patch for non-aiohttp async calls