fix(utils.py): fix parallel tool calling when streaming

This commit is contained in:
Krrish Dholakia 2023-11-29 10:56:21 -08:00
parent 9024a47dc2
commit b6bc75e27a
3 changed files with 74 additions and 51 deletions

View file

@ -2125,6 +2125,11 @@ def stream_chunk_builder(chunks: list, messages: Optional[list]=None):
id = None id = None
name = None name = None
type = None type = None
tool_calls_list = []
prev_index = 0
prev_id = None
curr_id = None
curr_index = 0
for chunk in chunks: for chunk in chunks:
choices = chunk["choices"] choices = chunk["choices"]
for choice in choices: for choice in choices:
@ -2134,6 +2139,11 @@ def stream_chunk_builder(chunks: list, messages: Optional[list]=None):
if tool_calls and tool_calls[0].function is not None: if tool_calls and tool_calls[0].function is not None:
if tool_calls[0].id: if tool_calls[0].id:
id = tool_calls[0].id id = tool_calls[0].id
curr_id = id
if prev_id is None:
prev_id = curr_id
if tool_calls[0].index:
curr_index = tool_calls[0].index
if tool_calls[0].function.arguments: if tool_calls[0].function.arguments:
# Now, tool_calls is expected to be a dictionary # Now, tool_calls is expected to be a dictionary
arguments = tool_calls[0].function.arguments arguments = tool_calls[0].function.arguments
@ -2142,10 +2152,17 @@ def stream_chunk_builder(chunks: list, messages: Optional[list]=None):
name = tool_calls[0].function.name name = tool_calls[0].function.name
if tool_calls[0].type: if tool_calls[0].type:
type = tool_calls[0].type type = tool_calls[0].type
if curr_index != prev_index: # new tool call
combined_arguments = "".join(argument_list)
tool_calls_list.append({"id": prev_id, "index": prev_index, "function": {"arguments": combined_arguments, "name": name}, "type": type})
argument_list = [] # reset
prev_index = curr_index
prev_id = curr_id
combined_arguments = "".join(argument_list) combined_arguments = "".join(argument_list)
response["choices"][0]["message"]["content"] = None tool_calls_list.append({"id": id, "function": {"arguments": combined_arguments, "name": name}, "type": type})
response["choices"][0]["message"]["tool_calls"] = [{"id": id, "function": {"arguments": combined_arguments, "name": name}, "type": type}] response["choices"][0]["message"]["content"] = None
response["choices"][0]["message"]["tool_calls"] = tool_calls_list
elif "function_call" in chunks[0]["choices"][0]["delta"] and chunks[0]["choices"][0]["delta"]["function_call"] is not None: elif "function_call" in chunks[0]["choices"][0]["delta"] and chunks[0]["choices"][0]["delta"]["function_call"] is not None:
argument_list = [] argument_list = []
delta = chunks[0]["choices"][0]["delta"] delta = chunks[0]["choices"][0]["delta"]

View file

@ -13,7 +13,7 @@ import litellm
from litellm import embedding, completion, completion_cost, Timeout from litellm import embedding, completion, completion_cost, Timeout
from litellm import RateLimitError from litellm import RateLimitError
import pytest import pytest
litellm.num_retries = 3 litellm.num_retries = 0
litellm.cache = None litellm.cache = None
# litellm.set_verbose=True # litellm.set_verbose=True
import json import json
@ -97,6 +97,7 @@ def test_parallel_function_call():
"content": function_response, "content": function_response,
} }
) # extend conversation with function response ) # extend conversation with function response
print(f"messages: {messages}")
second_response = litellm.completion( second_response = litellm.completion(
model="gpt-3.5-turbo-1106", model="gpt-3.5-turbo-1106",
messages=messages, messages=messages,
@ -108,7 +109,7 @@ def test_parallel_function_call():
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
# test_parallel_function_call() test_parallel_function_call()
@ -143,51 +144,53 @@ def test_parallel_function_call_stream():
tools=tools, tools=tools,
stream=True, stream=True,
tool_choice="auto", # auto is default, but we'll be explicit tool_choice="auto", # auto is default, but we'll be explicit
complete_response = True
) )
print("Response\n", response) print("Response\n", response)
for chunk in response: # for chunk in response:
print(chunk) # print(chunk)
# response_message = response.choices[0].message response_message = response.choices[0].message
# tool_calls = response_message.tool_calls tool_calls = response_message.tool_calls
# print("length of tool calls", len(tool_calls)) print("length of tool calls", len(tool_calls))
# print("Expecting there to be 3 tool calls") print("Expecting there to be 3 tool calls")
# assert len(tool_calls) > 1 # this has to call the function for SF, Tokyo and parise assert len(tool_calls) > 1 # this has to call the function for SF, Tokyo and parise
# # Step 2: check if the model wanted to call a function # Step 2: check if the model wanted to call a function
# if tool_calls: if tool_calls:
# # Step 3: call the function # Step 3: call the function
# # Note: the JSON response may not always be valid; be sure to handle errors # Note: the JSON response may not always be valid; be sure to handle errors
# available_functions = { available_functions = {
# "get_current_weather": get_current_weather, "get_current_weather": get_current_weather,
# } # only one function in this example, but you can have multiple } # only one function in this example, but you can have multiple
# messages.append(response_message) # extend conversation with assistant's reply messages.append(response_message) # extend conversation with assistant's reply
# print("Response message\n", response_message) print("Response message\n", response_message)
# # Step 4: send the info for each function call and function response to the model # Step 4: send the info for each function call and function response to the model
# for tool_call in tool_calls: for tool_call in tool_calls:
# function_name = tool_call.function.name function_name = tool_call.function.name
# function_to_call = available_functions[function_name] function_to_call = available_functions[function_name]
# function_args = json.loads(tool_call.function.arguments) function_args = json.loads(tool_call.function.arguments)
# function_response = function_to_call( function_response = function_to_call(
# location=function_args.get("location"), location=function_args.get("location"),
# unit=function_args.get("unit"), unit=function_args.get("unit"),
# ) )
# messages.append( messages.append(
# { {
# "tool_call_id": tool_call.id, "tool_call_id": tool_call.id,
# "role": "tool", "role": "tool",
# "name": function_name, "name": function_name,
# "content": function_response, "content": function_response,
# } }
# ) # extend conversation with function response ) # extend conversation with function response
# second_response = litellm.completion( print(f"messages: {messages}")
# model="gpt-3.5-turbo-1106", second_response = litellm.completion(
# messages=messages, model="gpt-3.5-turbo-1106",
# temperature=0.2, messages=messages,
# seed=22 temperature=0.2,
# ) # get a new response from the model where it can see the function response seed=22
# print("second response\n", second_response) ) # get a new response from the model where it can see the function response
# return second_response print("second response\n", second_response)
return second_response
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")

View file

@ -5247,11 +5247,14 @@ class CustomStreamWrapper:
original_chunk = response_obj.get("original_chunk", None) original_chunk = response_obj.get("original_chunk", None)
model_response.id = original_chunk.id model_response.id = original_chunk.id
if len(original_chunk.choices) > 0: if len(original_chunk.choices) > 0:
try: if original_chunk.choices[0].delta.function_call is not None or original_chunk.choices[0].delta.tool_calls is not None:
delta = dict(original_chunk.choices[0].delta) try:
model_response.choices[0].delta = Delta(**delta) delta = dict(original_chunk.choices[0].delta)
except Exception as e: model_response.choices[0].delta = Delta(**delta)
model_response.choices[0].delta = Delta() except Exception as e:
model_response.choices[0].delta = Delta()
else:
return
else: else:
return return
model_response.system_fingerprint = original_chunk.system_fingerprint model_response.system_fingerprint = original_chunk.system_fingerprint
@ -5284,7 +5287,7 @@ class CustomStreamWrapper:
chunk = self.completion_stream chunk = self.completion_stream
else: else:
chunk = next(self.completion_stream) chunk = next(self.completion_stream)
if chunk is not None and chunk != b'': if chunk is not None and chunk != b'':
response = self.chunk_creator(chunk=chunk) response = self.chunk_creator(chunk=chunk)
if response is not None: if response is not None: