diff --git a/litellm/main.py b/litellm/main.py index ed8dddf05b..1a6c8e178e 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -3678,6 +3678,7 @@ def stream_chunk_builder( response["usage"]["total_tokens"] = ( response["usage"]["prompt_tokens"] + response["usage"]["completion_tokens"] ) + return convert_to_model_response_object( response_object=response, model_response_object=model_response, diff --git a/litellm/tests/test_function_calling.py b/litellm/tests/test_function_calling.py index 2fcbdc9460..ffef8f6594 100644 --- a/litellm/tests/test_function_calling.py +++ b/litellm/tests/test_function_calling.py @@ -124,11 +124,12 @@ def test_parallel_function_call(): pytest.fail(f"Error occurred: {e}") -test_parallel_function_call() +# test_parallel_function_call() def test_parallel_function_call_stream(): try: + litellm.set_verbose = True # Step 1: send the conversation and available functions to the model messages = [ { @@ -217,4 +218,4 @@ def test_parallel_function_call_stream(): pytest.fail(f"Error occurred: {e}") -test_parallel_function_call_stream() +# test_parallel_function_call_stream() diff --git a/litellm/utils.py b/litellm/utils.py index 566c6f36df..c9618e8f38 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -276,15 +276,14 @@ class Delta(OpenAIObject): self.content = content self.role = role self.function_call = function_call - if tool_calls is not None: - if isinstance(tool_calls, dict): - self.tool_calls = [] - for tool_call in tool_calls: - if tool_call.get("index", None) is None: - tool_call["index"] = 0 - self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call)) - else: - self.tool_calls = tool_calls + if tool_calls is not None and isinstance(tool_calls, dict): + self.tool_calls = [] + for tool_call in tool_calls: + if tool_call.get("index", None) is None: + tool_call["index"] = 0 + self.tool_calls.append(ChatCompletionDeltaToolCall(**tool_call)) + else: + self.tool_calls = tool_calls def __contains__(self, key): # Define custom behavior for the 'in' operator @@ -8722,6 +8721,9 @@ class CustomStreamWrapper: ): try: delta = dict(original_chunk.choices[0].delta) + model_response.system_fingerprint = ( + original_chunk.system_fingerprint + ) ## AZURE - check if arguments is not None if ( original_chunk.choices[0].delta.function_call @@ -8762,32 +8764,64 @@ class CustomStreamWrapper: delta = dict(original_chunk.choices[0].delta) print_verbose(f"original delta: {delta}") model_response.choices[0].delta = Delta(**delta) + print_verbose( + f"new delta: {model_response.choices[0].delta}" + ) except Exception as e: model_response.choices[0].delta = Delta() else: return - model_response.system_fingerprint = original_chunk.system_fingerprint - if self.sent_first_chunk == False: - model_response.choices[0].delta["role"] = "assistant" - self.sent_first_chunk = True - + print_verbose( + f"model_response.choices[0].delta: {model_response.choices[0].delta}; completion_obj: {completion_obj}" + ) + print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") ## RETURN ARG if ( - completion_obj["content"] is not None - or response_obj.get("original_chunk", None) is not None - ): - hold = False - if completion_obj["content"] is not None: - hold, model_response_str = self.check_special_tokens( - chunk=completion_obj["content"], - finish_reason=model_response.choices[0].finish_reason, - ) # filter out bos/eos tokens from openai-compatible hf endpoints - print_verbose( - f"hold - {hold}, model_response_str - {model_response_str}" - ) + "content" in completion_obj + and isinstance(completion_obj["content"], str) + and len(completion_obj["content"]) > 0 + ): # cannot set content of an OpenAI Object to be an empty string + hold, model_response_str = self.check_special_tokens( + chunk=completion_obj["content"], + finish_reason=model_response.choices[0].finish_reason, + ) # filter out bos/eos tokens from openai-compatible hf endpoints + print_verbose( + f"hold - {hold}, model_response_str - {model_response_str}" + ) if hold is False: + ## check if openai/azure chunk original_chunk = response_obj.get("original_chunk", None) - if original_chunk is None: + if original_chunk: + model_response.id = original_chunk.id + if len(original_chunk.choices) > 0: + try: + delta = dict(original_chunk.choices[0].delta) + print_verbose(f"original delta: {delta}") + model_response.choices[0].delta = Delta(**delta) + except Exception as e: + model_response.choices[0].delta = Delta() + else: + return + model_response.system_fingerprint = ( + original_chunk.system_fingerprint + ) + print_verbose(f"self.sent_first_chunk: {self.sent_first_chunk}") + if self.sent_first_chunk == False: + model_response.choices[0].delta["role"] = "assistant" + self.sent_first_chunk = True + elif self.sent_first_chunk == True and hasattr( + model_response.choices[0].delta, "role" + ): + _initial_delta = model_response.choices[ + 0 + ].delta.model_dump() + _initial_delta.pop("role", None) + model_response.choices[0].delta = Delta(**_initial_delta) + print_verbose( + f"model_response.choices[0].delta: {model_response.choices[0].delta}" + ) + else: + ## else completion_obj["content"] = model_response_str if self.sent_first_chunk == False: completion_obj["role"] = "assistant" @@ -8812,6 +8846,14 @@ class CustomStreamWrapper: model_response.choices[0].finish_reason ) # ensure consistent output to openai return model_response + elif ( + model_response.choices[0].delta.tool_calls is not None + or model_response.choices[0].delta.function_call is not None + ): + if self.sent_first_chunk == False: + model_response.choices[0].delta["role"] = "assistant" + self.sent_first_chunk = True + return model_response else: return except StopIteration: @@ -8860,7 +8902,14 @@ class CustomStreamWrapper: print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}") response: Optional[ModelResponse] = self.chunk_creator(chunk=chunk) print_verbose(f"PROCESSED CHUNK POST CHUNK CREATOR: {response}") - if response is None: + + if response is None or ( + isinstance(response, ModelResponse) + and isinstance(response.choices[0], StreamingChoices) + and response.choices[0].delta.content is None + and response.choices[0].delta.function_call is None + and response.choices[0].delta.tool_calls is None + ): continue ## LOGGING threading.Thread( @@ -8904,7 +8953,11 @@ class CustomStreamWrapper: print_verbose(f"value of async chunk: {chunk}") if chunk == "None" or chunk is None: raise Exception - elif self.custom_llm_provider == "gemini" and len(chunk.parts) == 0: + elif ( + self.custom_llm_provider == "gemini" + and hasattr(chunk, "parts") + and len(chunk.parts) == 0 + ): continue # chunk_creator() does logging/stream chunk building. We need to let it know its being called in_async_func, so we don't double add chunks. # __anext__ also calls async_success_handler, which does logging @@ -8933,6 +8986,7 @@ class CustomStreamWrapper: self.rules.post_call_rules( input=self.response_uptil_now, model=self.model ) + print_verbose(f"final returned processed chunk: {processed_chunk}") return processed_chunk raise StopAsyncIteration else: # temporary patch for non-aiohttp async calls