feat(responses): implement full multi-turn support (#2295)

I think the implementation needs more simplification. Spent way too much
time trying to get the tests pass with models not co-operating :(
Finally had to switch claude-sonnet to get things to pass reliably.

### Test Plan

```
export TAVILY_SEARCH_API_KEY=...
export OPENAI_API_KEY=...

uv run pytest -p no:warnings \
   -s -v tests/verifications/openai_api/test_responses.py \
 --provider=stack:starter \
  --model openai/gpt-4o
```
This commit is contained in:
Ashwin Bharambe 2025-06-02 15:35:49 -07:00 committed by GitHub
parent cac7d404a2
commit dbe4e84aca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 593 additions and 136 deletions

View file

@ -604,6 +604,7 @@ class Agents(Protocol):
stream: bool | None = False,
temperature: float | None = None,
tools: list[OpenAIResponseInputTool] | None = None,
max_infer_iters: int | None = 10, # this is an extension to the OpenAI API
) -> OpenAIResponseObject | AsyncIterator[OpenAIResponseObjectStream]:
"""Create a new OpenAI response.

View file

@ -325,9 +325,10 @@ class MetaReferenceAgentsImpl(Agents):
stream: bool | None = False,
temperature: float | None = None,
tools: list[OpenAIResponseInputTool] | None = None,
max_infer_iters: int | None = 10,
) -> OpenAIResponseObject:
return await self.openai_responses_impl.create_openai_response(
input, model, instructions, previous_response_id, store, stream, temperature, tools
input, model, instructions, previous_response_id, store, stream, temperature, tools, max_infer_iters
)
async def list_openai_responses(

View file

@ -258,6 +258,18 @@ class OpenAIResponsesImpl:
"""
return await self.responses_store.list_response_input_items(response_id, after, before, include, limit, order)
def _is_function_tool_call(
self,
tool_call: OpenAIChatCompletionToolCall,
tools: list[OpenAIResponseInputTool],
) -> bool:
if not tool_call.function:
return False
for t in tools:
if t.type == "function" and t.name == tool_call.function.name:
return True
return False
async def _process_response_choices(
self,
chat_response: OpenAIChatCompletion,
@ -270,7 +282,7 @@ class OpenAIResponsesImpl:
for choice in chat_response.choices:
if choice.message.tool_calls and tools:
# Assume if the first tool is a function, all tools are functions
if tools[0].type == "function":
if self._is_function_tool_call(choice.message.tool_calls[0], tools):
for tool_call in choice.message.tool_calls:
output_messages.append(
OpenAIResponseOutputMessageFunctionToolCall(
@ -332,6 +344,7 @@ class OpenAIResponsesImpl:
stream: bool | None = False,
temperature: float | None = None,
tools: list[OpenAIResponseInputTool] | None = None,
max_infer_iters: int | None = 10,
):
stream = False if stream is None else stream
@ -358,58 +371,100 @@ class OpenAIResponsesImpl:
temperature=temperature,
)
inference_result = await self.inference_api.openai_chat_completion(
model=model,
messages=messages,
tools=chat_tools,
stream=stream,
temperature=temperature,
)
# Fork to streaming vs non-streaming - let each handle ALL inference rounds
if stream:
return self._create_streaming_response(
inference_result=inference_result,
ctx=ctx,
output_messages=output_messages,
input=input,
model=model,
store=store,
tools=tools,
max_infer_iters=max_infer_iters,
)
else:
return await self._create_non_streaming_response(
inference_result=inference_result,
ctx=ctx,
output_messages=output_messages,
input=input,
model=model,
store=store,
tools=tools,
max_infer_iters=max_infer_iters,
)
async def _create_non_streaming_response(
self,
inference_result: Any,
ctx: ChatCompletionContext,
output_messages: list[OpenAIResponseOutput],
input: str | list[OpenAIResponseInput],
model: str,
store: bool | None,
tools: list[OpenAIResponseInputTool] | None,
max_infer_iters: int | None,
) -> OpenAIResponseObject:
chat_response = OpenAIChatCompletion(**inference_result.model_dump())
# Implement tool execution loop - handle ALL inference rounds including the first
n_iter = 0
messages = ctx.messages.copy()
current_response = None
# Process response choices (tool execution and message creation)
output_messages.extend(
await self._process_response_choices(
chat_response=chat_response,
ctx=ctx,
tools=tools,
while True:
# Do inference (including the first one)
inference_result = await self.inference_api.openai_chat_completion(
model=ctx.model,
messages=messages,
tools=ctx.tools,
stream=False,
temperature=ctx.temperature,
)
)
current_response = OpenAIChatCompletion(**inference_result.model_dump())
# Separate function vs non-function tool calls
function_tool_calls = []
non_function_tool_calls = []
for choice in current_response.choices:
if choice.message.tool_calls and tools:
for tool_call in choice.message.tool_calls:
if self._is_function_tool_call(tool_call, tools):
function_tool_calls.append(tool_call)
else:
non_function_tool_calls.append(tool_call)
# Process response choices based on tool call types
if function_tool_calls:
# For function tool calls, use existing logic and return immediately
current_output_messages = await self._process_response_choices(
chat_response=current_response,
ctx=ctx,
tools=tools,
)
output_messages.extend(current_output_messages)
break
elif non_function_tool_calls:
# For non-function tool calls, execute them and continue loop
for choice in current_response.choices:
tool_outputs, tool_response_messages = await self._execute_tool_calls_only(choice, ctx)
output_messages.extend(tool_outputs)
# Add assistant message and tool responses to messages for next iteration
messages.append(choice.message)
messages.extend(tool_response_messages)
n_iter += 1
if n_iter >= (max_infer_iters or 10):
break
# Continue with next iteration of the loop
continue
else:
# No tool calls - convert response to message and we're done
for choice in current_response.choices:
output_messages.append(await _convert_chat_choice_to_response_message(choice))
break
response = OpenAIResponseObject(
created_at=chat_response.created,
created_at=current_response.created,
id=f"resp-{uuid.uuid4()}",
model=model,
object="response",
@ -429,13 +484,13 @@ class OpenAIResponsesImpl:
async def _create_streaming_response(
self,
inference_result: Any,
ctx: ChatCompletionContext,
output_messages: list[OpenAIResponseOutput],
input: str | list[OpenAIResponseInput],
model: str,
store: bool | None,
tools: list[OpenAIResponseInputTool] | None,
max_infer_iters: int | None,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Create initial response and emit response.created immediately
response_id = f"resp-{uuid.uuid4()}"
@ -453,87 +508,135 @@ class OpenAIResponsesImpl:
# Emit response.created immediately
yield OpenAIResponseObjectStreamResponseCreated(response=initial_response)
# For streaming, inference_result is an async iterator of chunks
# Stream chunks and emit delta events as they arrive
chat_response_id = ""
chat_response_content = []
chat_response_tool_calls: dict[int, OpenAIChatCompletionToolCall] = {}
chunk_created = 0
chunk_model = ""
chunk_finish_reason = ""
sequence_number = 0
# Implement tool execution loop for streaming - handle ALL inference rounds including the first
n_iter = 0
messages = ctx.messages.copy()
# Create a placeholder message item for delta events
message_item_id = f"msg_{uuid.uuid4()}"
async for chunk in inference_result:
chat_response_id = chunk.id
chunk_created = chunk.created
chunk_model = chunk.model
for chunk_choice in chunk.choices:
# Emit incremental text content as delta events
if chunk_choice.delta.content:
sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=0,
delta=chunk_choice.delta.content,
item_id=message_item_id,
output_index=0,
sequence_number=sequence_number,
)
# Collect content for final response
chat_response_content.append(chunk_choice.delta.content or "")
if chunk_choice.finish_reason:
chunk_finish_reason = chunk_choice.finish_reason
# Aggregate tool call arguments across chunks, using their index as the aggregation key
if chunk_choice.delta.tool_calls:
for tool_call in chunk_choice.delta.tool_calls:
response_tool_call = chat_response_tool_calls.get(tool_call.index, None)
if response_tool_call:
# Don't attempt to concatenate arguments if we don't have any new arguments
if tool_call.function.arguments:
# Guard against an initial None argument before we concatenate
response_tool_call.function.arguments = (
response_tool_call.function.arguments or ""
) + tool_call.function.arguments
else:
tool_call_dict: dict[str, Any] = tool_call.model_dump()
tool_call_dict.pop("type", None)
response_tool_call = OpenAIChatCompletionToolCall(**tool_call_dict)
chat_response_tool_calls[tool_call.index] = response_tool_call
# Convert collected chunks to complete response
if chat_response_tool_calls:
tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())]
else:
tool_calls = None
assistant_message = OpenAIAssistantMessageParam(
content="".join(chat_response_content),
tool_calls=tool_calls,
)
chat_response_obj = OpenAIChatCompletion(
id=chat_response_id,
choices=[
OpenAIChoice(
message=assistant_message,
finish_reason=chunk_finish_reason,
index=0,
)
],
created=chunk_created,
model=chunk_model,
)
# Process response choices (tool execution and message creation)
output_messages.extend(
await self._process_response_choices(
chat_response=chat_response_obj,
ctx=ctx,
tools=tools,
while True:
# Do inference (including the first one) - streaming
current_inference_result = await self.inference_api.openai_chat_completion(
model=ctx.model,
messages=messages,
tools=ctx.tools,
stream=True,
temperature=ctx.temperature,
)
)
# Process streaming chunks and build complete response
chat_response_id = ""
chat_response_content = []
chat_response_tool_calls: dict[int, OpenAIChatCompletionToolCall] = {}
chunk_created = 0
chunk_model = ""
chunk_finish_reason = ""
sequence_number = 0
# Create a placeholder message item for delta events
message_item_id = f"msg_{uuid.uuid4()}"
async for chunk in current_inference_result:
chat_response_id = chunk.id
chunk_created = chunk.created
chunk_model = chunk.model
for chunk_choice in chunk.choices:
# Emit incremental text content as delta events
if chunk_choice.delta.content:
sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=0,
delta=chunk_choice.delta.content,
item_id=message_item_id,
output_index=0,
sequence_number=sequence_number,
)
# Collect content for final response
chat_response_content.append(chunk_choice.delta.content or "")
if chunk_choice.finish_reason:
chunk_finish_reason = chunk_choice.finish_reason
# Aggregate tool call arguments across chunks
if chunk_choice.delta.tool_calls:
for tool_call in chunk_choice.delta.tool_calls:
response_tool_call = chat_response_tool_calls.get(tool_call.index, None)
if response_tool_call:
# Don't attempt to concatenate arguments if we don't have any new argumentsAdd commentMore actions
if tool_call.function.arguments:
# Guard against an initial None argument before we concatenate
response_tool_call.function.arguments = (
response_tool_call.function.arguments or ""
) + tool_call.function.arguments
else:
tool_call_dict: dict[str, Any] = tool_call.model_dump()
tool_call_dict.pop("type", None)
response_tool_call = OpenAIChatCompletionToolCall(**tool_call_dict)
chat_response_tool_calls[tool_call.index] = response_tool_call
# Convert collected chunks to complete response
if chat_response_tool_calls:
tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())]
else:
tool_calls = None
assistant_message = OpenAIAssistantMessageParam(
content="".join(chat_response_content),
tool_calls=tool_calls,
)
current_response = OpenAIChatCompletion(
id=chat_response_id,
choices=[
OpenAIChoice(
message=assistant_message,
finish_reason=chunk_finish_reason,
index=0,
)
],
created=chunk_created,
model=chunk_model,
)
# Separate function vs non-function tool calls
function_tool_calls = []
non_function_tool_calls = []
for choice in current_response.choices:
if choice.message.tool_calls and tools:
for tool_call in choice.message.tool_calls:
if self._is_function_tool_call(tool_call, tools):
function_tool_calls.append(tool_call)
else:
non_function_tool_calls.append(tool_call)
# Process response choices based on tool call types
if function_tool_calls:
# For function tool calls, use existing logic and break
current_output_messages = await self._process_response_choices(
chat_response=current_response,
ctx=ctx,
tools=tools,
)
output_messages.extend(current_output_messages)
break
elif non_function_tool_calls:
# For non-function tool calls, execute them and continue loop
for choice in current_response.choices:
tool_outputs, tool_response_messages = await self._execute_tool_calls_only(choice, ctx)
output_messages.extend(tool_outputs)
# Add assistant message and tool responses to messages for next iteration
messages.append(choice.message)
messages.extend(tool_response_messages)
n_iter += 1
if n_iter >= (max_infer_iters or 10):
break
# Continue with next iteration of the loop
continue
else:
# No tool calls - convert response to message and we're done
for choice in current_response.choices:
output_messages.append(await _convert_chat_choice_to_response_message(choice))
break
# Create final response
final_response = OpenAIResponseObject(
@ -646,6 +749,30 @@ class OpenAIResponsesImpl:
raise ValueError(f"Llama Stack OpenAI Responses does not yet support tool type: {input_tool.type}")
return chat_tools, mcp_tool_to_server, mcp_list_message
async def _execute_tool_calls_only(
self,
choice: OpenAIChoice,
ctx: ChatCompletionContext,
) -> tuple[list[OpenAIResponseOutput], list[OpenAIMessageParam]]:
"""Execute tool calls and return output messages and tool response messages for next inference."""
output_messages: list[OpenAIResponseOutput] = []
tool_response_messages: list[OpenAIMessageParam] = []
if not isinstance(choice.message, OpenAIAssistantMessageParam):
return output_messages, tool_response_messages
if not choice.message.tool_calls:
return output_messages, tool_response_messages
for tool_call in choice.message.tool_calls:
tool_call_log, further_input = await self._execute_tool_call(tool_call, ctx)
if tool_call_log:
output_messages.append(tool_call_log)
if further_input:
tool_response_messages.append(further_input)
return output_messages, tool_response_messages
async def _execute_tool_and_return_final_output(
self,
choice: OpenAIChoice,
@ -772,5 +899,8 @@ class OpenAIResponsesImpl:
else:
raise ValueError(f"Unknown result content type: {type(result.content)}")
input_message = OpenAIToolMessageParam(content=content, tool_call_id=tool_call_id)
else:
text = str(error_exc)
input_message = OpenAIToolMessageParam(content=text, tool_call_id=tool_call_id)
return message, input_message