diff --git a/README.md b/README.md index 8db4580a2..4df4a5372 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # Llama Stack -meta-llama%2Fllama-stack | Trendshift - ------ [![PyPI version](https://img.shields.io/pypi/v/llama_stack.svg)](https://pypi.org/project/llama_stack/) [![PyPI - Downloads](https://img.shields.io/pypi/dm/llama-stack)](https://pypi.org/project/llama-stack/) [![License](https://img.shields.io/pypi/l/llama_stack.svg)](https://github.com/meta-llama/llama-stack/blob/main/LICENSE) diff --git a/docs/_static/llama-stack-spec.html b/docs/_static/llama-stack-spec.html index 25f916d87..0549dda21 100644 --- a/docs/_static/llama-stack-spec.html +++ b/docs/_static/llama-stack-spec.html @@ -8821,6 +8821,61 @@ "title": "OpenAIResponseOutputMessageMCPListTools", "description": "MCP list tools output message containing available tools from an MCP server." }, + "OpenAIResponseContentPart": { + "oneOf": [ + { + "$ref": "#/components/schemas/OpenAIResponseContentPartOutputText" + }, + { + "$ref": "#/components/schemas/OpenAIResponseContentPartRefusal" + } + ], + "discriminator": { + "propertyName": "type", + "mapping": { + "output_text": "#/components/schemas/OpenAIResponseContentPartOutputText", + "refusal": "#/components/schemas/OpenAIResponseContentPartRefusal" + } + } + }, + "OpenAIResponseContentPartOutputText": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "output_text", + "default": "output_text" + }, + "text": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "type", + "text" + ], + "title": "OpenAIResponseContentPartOutputText" + }, + "OpenAIResponseContentPartRefusal": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "refusal", + "default": "refusal" + }, + "refusal": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "type", + "refusal" + ], + "title": "OpenAIResponseContentPartRefusal" + }, "OpenAIResponseObjectStream": { "oneOf": [ { @@ -8877,6 +8932,12 @@ { "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted" }, + { + "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded" + }, + { + "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone" + }, { "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseCompleted" } @@ -8902,6 +8963,8 @@ "response.mcp_call.in_progress": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress", "response.mcp_call.failed": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed", "response.mcp_call.completed": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted", + "response.content_part.added": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded", + "response.content_part.done": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone", "response.completed": "#/components/schemas/OpenAIResponseObjectStreamResponseCompleted" } } @@ -8928,6 +8991,80 @@ "title": "OpenAIResponseObjectStreamResponseCompleted", "description": "Streaming event indicating a response has been completed." }, + "OpenAIResponseObjectStreamResponseContentPartAdded": { + "type": "object", + "properties": { + "response_id": { + "type": "string", + "description": "Unique identifier of the response containing this content" + }, + "item_id": { + "type": "string", + "description": "Unique identifier of the output item containing this content part" + }, + "part": { + "$ref": "#/components/schemas/OpenAIResponseContentPart", + "description": "The content part that was added" + }, + "sequence_number": { + "type": "integer", + "description": "Sequential number for ordering streaming events" + }, + "type": { + "type": "string", + "const": "response.content_part.added", + "default": "response.content_part.added", + "description": "Event type identifier, always \"response.content_part.added\"" + } + }, + "additionalProperties": false, + "required": [ + "response_id", + "item_id", + "part", + "sequence_number", + "type" + ], + "title": "OpenAIResponseObjectStreamResponseContentPartAdded", + "description": "Streaming event for when a new content part is added to a response item." + }, + "OpenAIResponseObjectStreamResponseContentPartDone": { + "type": "object", + "properties": { + "response_id": { + "type": "string", + "description": "Unique identifier of the response containing this content" + }, + "item_id": { + "type": "string", + "description": "Unique identifier of the output item containing this content part" + }, + "part": { + "$ref": "#/components/schemas/OpenAIResponseContentPart", + "description": "The completed content part" + }, + "sequence_number": { + "type": "integer", + "description": "Sequential number for ordering streaming events" + }, + "type": { + "type": "string", + "const": "response.content_part.done", + "default": "response.content_part.done", + "description": "Event type identifier, always \"response.content_part.done\"" + } + }, + "additionalProperties": false, + "required": [ + "response_id", + "item_id", + "part", + "sequence_number", + "type" + ], + "title": "OpenAIResponseObjectStreamResponseContentPartDone", + "description": "Streaming event for when a content part is completed." + }, "OpenAIResponseObjectStreamResponseCreated": { "type": "object", "properties": { diff --git a/docs/_static/llama-stack-spec.yaml b/docs/_static/llama-stack-spec.yaml index 43e9fa95a..aa47cd58d 100644 --- a/docs/_static/llama-stack-spec.yaml +++ b/docs/_static/llama-stack-spec.yaml @@ -6441,6 +6441,43 @@ components: title: OpenAIResponseOutputMessageMCPListTools description: >- MCP list tools output message containing available tools from an MCP server. + OpenAIResponseContentPart: + oneOf: + - $ref: '#/components/schemas/OpenAIResponseContentPartOutputText' + - $ref: '#/components/schemas/OpenAIResponseContentPartRefusal' + discriminator: + propertyName: type + mapping: + output_text: '#/components/schemas/OpenAIResponseContentPartOutputText' + refusal: '#/components/schemas/OpenAIResponseContentPartRefusal' + OpenAIResponseContentPartOutputText: + type: object + properties: + type: + type: string + const: output_text + default: output_text + text: + type: string + additionalProperties: false + required: + - type + - text + title: OpenAIResponseContentPartOutputText + OpenAIResponseContentPartRefusal: + type: object + properties: + type: + type: string + const: refusal + default: refusal + refusal: + type: string + additionalProperties: false + required: + - type + - refusal + title: OpenAIResponseContentPartRefusal OpenAIResponseObjectStream: oneOf: - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseCreated' @@ -6461,6 +6498,8 @@ components: - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted' + - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded' + - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseCompleted' discriminator: propertyName: type @@ -6483,6 +6522,8 @@ components: response.mcp_call.in_progress: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress' response.mcp_call.failed: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed' response.mcp_call.completed: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted' + response.content_part.added: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded' + response.content_part.done: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone' response.completed: '#/components/schemas/OpenAIResponseObjectStreamResponseCompleted' "OpenAIResponseObjectStreamResponseCompleted": type: object @@ -6504,6 +6545,76 @@ components: OpenAIResponseObjectStreamResponseCompleted description: >- Streaming event indicating a response has been completed. + "OpenAIResponseObjectStreamResponseContentPartAdded": + type: object + properties: + response_id: + type: string + description: >- + Unique identifier of the response containing this content + item_id: + type: string + description: >- + Unique identifier of the output item containing this content part + part: + $ref: '#/components/schemas/OpenAIResponseContentPart' + description: The content part that was added + sequence_number: + type: integer + description: >- + Sequential number for ordering streaming events + type: + type: string + const: response.content_part.added + default: response.content_part.added + description: >- + Event type identifier, always "response.content_part.added" + additionalProperties: false + required: + - response_id + - item_id + - part + - sequence_number + - type + title: >- + OpenAIResponseObjectStreamResponseContentPartAdded + description: >- + Streaming event for when a new content part is added to a response item. + "OpenAIResponseObjectStreamResponseContentPartDone": + type: object + properties: + response_id: + type: string + description: >- + Unique identifier of the response containing this content + item_id: + type: string + description: >- + Unique identifier of the output item containing this content part + part: + $ref: '#/components/schemas/OpenAIResponseContentPart' + description: The completed content part + sequence_number: + type: integer + description: >- + Sequential number for ordering streaming events + type: + type: string + const: response.content_part.done + default: response.content_part.done + description: >- + Event type identifier, always "response.content_part.done" + additionalProperties: false + required: + - response_id + - item_id + - part + - sequence_number + - type + title: >- + OpenAIResponseObjectStreamResponseContentPartDone + description: >- + Streaming event for when a content part is completed. "OpenAIResponseObjectStreamResponseCreated": type: object properties: diff --git a/llama_stack/apis/agents/openai_responses.py b/llama_stack/apis/agents/openai_responses.py index 8574104dc..591992479 100644 --- a/llama_stack/apis/agents/openai_responses.py +++ b/llama_stack/apis/agents/openai_responses.py @@ -623,6 +623,62 @@ class OpenAIResponseObjectStreamResponseMcpCallCompleted(BaseModel): type: Literal["response.mcp_call.completed"] = "response.mcp_call.completed" +@json_schema_type +class OpenAIResponseContentPartOutputText(BaseModel): + type: Literal["output_text"] = "output_text" + text: str + # TODO: add annotations, logprobs, etc. + + +@json_schema_type +class OpenAIResponseContentPartRefusal(BaseModel): + type: Literal["refusal"] = "refusal" + refusal: str + + +OpenAIResponseContentPart = Annotated[ + OpenAIResponseContentPartOutputText | OpenAIResponseContentPartRefusal, + Field(discriminator="type"), +] +register_schema(OpenAIResponseContentPart, name="OpenAIResponseContentPart") + + +@json_schema_type +class OpenAIResponseObjectStreamResponseContentPartAdded(BaseModel): + """Streaming event for when a new content part is added to a response item. + + :param response_id: Unique identifier of the response containing this content + :param item_id: Unique identifier of the output item containing this content part + :param part: The content part that was added + :param sequence_number: Sequential number for ordering streaming events + :param type: Event type identifier, always "response.content_part.added" + """ + + response_id: str + item_id: str + part: OpenAIResponseContentPart + sequence_number: int + type: Literal["response.content_part.added"] = "response.content_part.added" + + +@json_schema_type +class OpenAIResponseObjectStreamResponseContentPartDone(BaseModel): + """Streaming event for when a content part is completed. + + :param response_id: Unique identifier of the response containing this content + :param item_id: Unique identifier of the output item containing this content part + :param part: The completed content part + :param sequence_number: Sequential number for ordering streaming events + :param type: Event type identifier, always "response.content_part.done" + """ + + response_id: str + item_id: str + part: OpenAIResponseContentPart + sequence_number: int + type: Literal["response.content_part.done"] = "response.content_part.done" + + OpenAIResponseObjectStream = Annotated[ OpenAIResponseObjectStreamResponseCreated | OpenAIResponseObjectStreamResponseOutputItemAdded @@ -642,6 +698,8 @@ OpenAIResponseObjectStream = Annotated[ | OpenAIResponseObjectStreamResponseMcpCallInProgress | OpenAIResponseObjectStreamResponseMcpCallFailed | OpenAIResponseObjectStreamResponseMcpCallCompleted + | OpenAIResponseObjectStreamResponseContentPartAdded + | OpenAIResponseObjectStreamResponseContentPartDone | OpenAIResponseObjectStreamResponseCompleted, Field(discriminator="type"), ] diff --git a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py index 104f15010..6aca4d68e 100644 --- a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py +++ b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py @@ -20,6 +20,7 @@ from llama_stack.apis.agents.openai_responses import ( ListOpenAIResponseInputItem, ListOpenAIResponseObject, OpenAIDeleteResponseObject, + OpenAIResponseContentPartOutputText, OpenAIResponseInput, OpenAIResponseInputFunctionToolCallOutput, OpenAIResponseInputMessageContent, @@ -32,12 +33,22 @@ from llama_stack.apis.agents.openai_responses import ( OpenAIResponseObject, OpenAIResponseObjectStream, OpenAIResponseObjectStreamResponseCompleted, + OpenAIResponseObjectStreamResponseContentPartAdded, + OpenAIResponseObjectStreamResponseContentPartDone, OpenAIResponseObjectStreamResponseCreated, OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta, OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDone, + OpenAIResponseObjectStreamResponseMcpCallCompleted, + OpenAIResponseObjectStreamResponseMcpCallFailed, + OpenAIResponseObjectStreamResponseMcpCallInProgress, OpenAIResponseObjectStreamResponseOutputItemAdded, OpenAIResponseObjectStreamResponseOutputItemDone, OpenAIResponseObjectStreamResponseOutputTextDelta, + OpenAIResponseObjectStreamResponseWebSearchCallCompleted, + OpenAIResponseObjectStreamResponseWebSearchCallInProgress, + OpenAIResponseObjectStreamResponseWebSearchCallSearching, OpenAIResponseOutput, OpenAIResponseOutputMessageContent, OpenAIResponseOutputMessageContentOutputText, @@ -87,6 +98,15 @@ logger = get_logger(name=__name__, category="openai_responses") OPENAI_RESPONSES_PREFIX = "openai_responses:" +class ToolExecutionResult(BaseModel): + """Result of streaming tool execution.""" + + stream_event: OpenAIResponseObjectStream | None = None + sequence_number: int + final_output_message: OpenAIResponseOutput | None = None + final_input_message: OpenAIMessageParam | None = None + + async def _convert_response_content_to_chat_content( content: (str | list[OpenAIResponseInputMessageContent] | list[OpenAIResponseOutputMessageContent]), ) -> str | list[OpenAIChatCompletionContentPartParam]: @@ -460,6 +480,8 @@ class OpenAIResponsesImpl: message_item_id = f"msg_{uuid.uuid4()}" # Track tool call items for streaming events tool_call_item_ids: dict[int, str] = {} + # Track content parts for streaming events + content_part_emitted = False async for chunk in completion_result: chat_response_id = chunk.id @@ -468,6 +490,18 @@ class OpenAIResponsesImpl: for chunk_choice in chunk.choices: # Emit incremental text content as delta events if chunk_choice.delta.content: + # Emit content_part.added event for first text chunk + if not content_part_emitted: + content_part_emitted = True + sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartAdded( + response_id=response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text="", # Will be filled incrementally via text deltas + ), + sequence_number=sequence_number, + ) sequence_number += 1 yield OpenAIResponseObjectStreamResponseOutputTextDelta( content_index=0, @@ -514,16 +548,33 @@ class OpenAIResponsesImpl: sequence_number=sequence_number, ) - # Stream function call arguments as they arrive + # Stream tool call arguments as they arrive (differentiate between MCP and function calls) if tool_call.function and tool_call.function.arguments: tool_call_item_id = tool_call_item_ids[tool_call.index] sequence_number += 1 - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( - delta=tool_call.function.arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, + + # Check if this is an MCP tool call + is_mcp_tool = ( + ctx.mcp_tool_to_server + and tool_call.function.name + and tool_call.function.name in ctx.mcp_tool_to_server ) + if is_mcp_tool: + # Emit MCP-specific argument delta event + yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) + else: + # Emit function call argument delta event + yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) # Accumulate arguments for final response (only for subsequent chunks) if not is_new_tool_call: @@ -531,27 +582,55 @@ class OpenAIResponsesImpl: response_tool_call.function.arguments or "" ) + tool_call.function.arguments - # Emit function_call_arguments.done events for completed tool calls + # Emit arguments.done events for completed tool calls (differentiate between MCP and function calls) for tool_call_index in sorted(chat_response_tool_calls.keys()): tool_call_item_id = tool_call_item_ids[tool_call_index] final_arguments = chat_response_tool_calls[tool_call_index].function.arguments or "" + tool_call_name = chat_response_tool_calls[tool_call_index].function.name + + # Check if this is an MCP tool call + is_mcp_tool = ctx.mcp_tool_to_server and tool_call_name and tool_call_name in ctx.mcp_tool_to_server sequence_number += 1 - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone( - arguments=final_arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) + if is_mcp_tool: + # Emit MCP-specific argument done event + yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDone( + arguments=final_arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) + else: + # Emit function call argument done event + yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone( + arguments=final_arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) # Convert collected chunks to complete response if chat_response_tool_calls: tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())] - - # when there are tool calls, we need to clear the content - chat_response_content = [] else: tool_calls = None + # Emit content_part.done event if text content was streamed (before content gets cleared) + if content_part_emitted: + final_text = "".join(chat_response_content) + sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartDone( + response_id=response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text=final_text, + ), + sequence_number=sequence_number, + ) + + # Clear content when there are tool calls (OpenAI spec behavior) + if chat_response_tool_calls: + chat_response_content = [] + assistant_message = OpenAIAssistantMessageParam( content="".join(chat_response_content), tool_calls=tool_calls, @@ -587,19 +666,38 @@ class OpenAIResponsesImpl: # execute non-function tool calls for tool_call in non_function_tool_calls: - tool_call_log, tool_response_message = await self._execute_tool_call(tool_call, ctx) + # Find the item_id for this tool call + matching_item_id = None + for index, item_id in tool_call_item_ids.items(): + response_tool_call = chat_response_tool_calls.get(index) + if response_tool_call and response_tool_call.id == tool_call.id: + matching_item_id = item_id + break + + # Use a fallback item_id if not found + if not matching_item_id: + matching_item_id = f"tc_{uuid.uuid4()}" + + # Execute tool call with streaming + tool_call_log = None + tool_response_message = None + async for result in self._execute_tool_call( + tool_call, ctx, sequence_number, response_id, len(output_messages), matching_item_id + ): + if result.stream_event: + # Forward streaming events + sequence_number = result.sequence_number + yield result.stream_event + + if result.final_output_message is not None: + tool_call_log = result.final_output_message + tool_response_message = result.final_input_message + sequence_number = result.sequence_number + if tool_call_log: output_messages.append(tool_call_log) # Emit output_item.done event for completed non-function tool call - # Find the item_id for this tool call - matching_item_id = None - for index, item_id in tool_call_item_ids.items(): - response_tool_call = chat_response_tool_calls.get(index) - if response_tool_call and response_tool_call.id == tool_call.id: - matching_item_id = item_id - break - if matching_item_id: sequence_number += 1 yield OpenAIResponseObjectStreamResponseOutputItemDone( @@ -848,7 +946,11 @@ class OpenAIResponsesImpl: self, tool_call: OpenAIChatCompletionToolCall, ctx: ChatCompletionContext, - ) -> tuple[OpenAIResponseOutput | None, OpenAIMessageParam | None]: + sequence_number: int, + response_id: str, + output_index: int, + item_id: str, + ) -> AsyncIterator[ToolExecutionResult]: from llama_stack.providers.utils.inference.prompt_adapter import ( interleaved_content_as_str, ) @@ -858,8 +960,41 @@ class OpenAIResponsesImpl: tool_kwargs = json.loads(function.arguments) if function.arguments else {} if not function or not tool_call_id or not function.name: - return None, None + yield ToolExecutionResult(sequence_number=sequence_number) + return + # Emit in_progress event based on tool type (only for tools with specific streaming events) + progress_event = None + if ctx.mcp_tool_to_server and function.name in ctx.mcp_tool_to_server: + sequence_number += 1 + progress_event = OpenAIResponseObjectStreamResponseMcpCallInProgress( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + elif function.name == "web_search": + sequence_number += 1 + progress_event = OpenAIResponseObjectStreamResponseWebSearchCallInProgress( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + # Note: knowledge_search and other custom tools don't have specific streaming events in OpenAI spec + + if progress_event: + yield ToolExecutionResult(stream_event=progress_event, sequence_number=sequence_number) + + # For web search, emit searching event + if function.name == "web_search": + sequence_number += 1 + searching_event = OpenAIResponseObjectStreamResponseWebSearchCallSearching( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number) + + # Execute the actual tool call error_exc = None result = None try: @@ -894,6 +1029,33 @@ class OpenAIResponsesImpl: except Exception as e: error_exc = e + # Emit completion or failure event based on result (only for tools with specific streaming events) + has_error = error_exc or (result and ((result.error_code and result.error_code > 0) or result.error_message)) + completion_event = None + + if ctx.mcp_tool_to_server and function.name in ctx.mcp_tool_to_server: + sequence_number += 1 + if has_error: + completion_event = OpenAIResponseObjectStreamResponseMcpCallFailed( + sequence_number=sequence_number, + ) + else: + completion_event = OpenAIResponseObjectStreamResponseMcpCallCompleted( + sequence_number=sequence_number, + ) + elif function.name == "web_search": + sequence_number += 1 + completion_event = OpenAIResponseObjectStreamResponseWebSearchCallCompleted( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + # Note: knowledge_search and other custom tools don't have specific completion events in OpenAI spec + + if completion_event: + yield ToolExecutionResult(stream_event=completion_event, sequence_number=sequence_number) + + # Build the result message and input message if function.name in ctx.mcp_tool_to_server: from llama_stack.apis.agents.openai_responses import ( OpenAIResponseOutputMessageMCPCall, @@ -907,9 +1069,9 @@ class OpenAIResponsesImpl: ) if error_exc: message.error = str(error_exc) - elif (result.error_code and result.error_code > 0) or result.error_message: + elif (result and result.error_code and result.error_code > 0) or (result and result.error_message): message.error = f"Error (code {result.error_code}): {result.error_message}" - elif result.content: + elif result and result.content: message.output = interleaved_content_as_str(result.content) else: if function.name == "web_search": @@ -917,7 +1079,7 @@ class OpenAIResponsesImpl: id=tool_call_id, status="completed", ) - if error_exc or (result.error_code and result.error_code > 0) or result.error_message: + if has_error: message.status = "failed" elif function.name == "knowledge_search": message = OpenAIResponseOutputMessageFileSearchToolCall( @@ -925,7 +1087,7 @@ class OpenAIResponsesImpl: queries=[tool_kwargs.get("query", "")], status="completed", ) - if "document_ids" in result.metadata: + if result and "document_ids" in result.metadata: message.results = [] for i, doc_id in enumerate(result.metadata["document_ids"]): text = result.metadata["chunks"][i] if "chunks" in result.metadata else None @@ -939,7 +1101,7 @@ class OpenAIResponsesImpl: attributes={}, ) ) - if error_exc or (result.error_code and result.error_code > 0) or result.error_message: + if has_error: message.status = "failed" else: raise ValueError(f"Unknown tool {function.name} called") @@ -971,10 +1133,13 @@ class OpenAIResponsesImpl: raise ValueError(f"Unknown result content type: {type(result.content)}") input_message = OpenAIToolMessageParam(content=content, tool_call_id=tool_call_id) else: - text = str(error_exc) + text = str(error_exc) if error_exc else "Tool execution failed" input_message = OpenAIToolMessageParam(content=text, tool_call_id=tool_call_id) - return message, input_message + # Yield the final result + yield ToolExecutionResult( + sequence_number=sequence_number, final_output_message=message, final_input_message=input_message + ) def _is_function_tool_call( diff --git a/tests/integration/non_ci/responses/test_responses.py b/tests/integration/non_ci/responses/test_responses.py index 6092346b0..04266eec8 100644 --- a/tests/integration/non_ci/responses/test_responses.py +++ b/tests/integration/non_ci/responses/test_responses.py @@ -590,25 +590,59 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ # Verify tool call streaming events are present chunk_types = [chunk.type for chunk in chunks] - # Should have function call arguments delta events for tool calls - delta_events = [chunk for chunk in chunks if chunk.type == "response.function_call_arguments.delta"] - done_events = [chunk for chunk in chunks if chunk.type == "response.function_call_arguments.done"] + # Should have function call or MCP arguments delta/done events for tool calls + delta_events = [ + chunk + for chunk in chunks + if chunk.type in ["response.function_call_arguments.delta", "response.mcp_call.arguments.delta"] + ] + done_events = [ + chunk + for chunk in chunks + if chunk.type in ["response.function_call_arguments.done", "response.mcp_call.arguments.done"] + ] # Should have output item events for tool calls item_added_events = [chunk for chunk in chunks if chunk.type == "response.output_item.added"] item_done_events = [chunk for chunk in chunks if chunk.type == "response.output_item.done"] + # Should have tool execution progress events + mcp_in_progress_events = [chunk for chunk in chunks if chunk.type == "response.mcp_call.in_progress"] + mcp_completed_events = [chunk for chunk in chunks if chunk.type == "response.mcp_call.completed"] + # Verify we have substantial streaming activity (not just batch events) assert len(chunks) > 10, f"Expected rich streaming with many events, got only {len(chunks)} chunks" # Since this test involves MCP tool calls, we should see streaming events - assert len(delta_events) > 0, f"Expected function_call_arguments.delta events, got chunk types: {chunk_types}" - assert len(done_events) > 0, f"Expected function_call_arguments.done events, got chunk types: {chunk_types}" + assert len(delta_events) > 0, ( + f"Expected function_call_arguments.delta or mcp_call.arguments.delta events, got chunk types: {chunk_types}" + ) + assert len(done_events) > 0, ( + f"Expected function_call_arguments.done or mcp_call.arguments.done events, got chunk types: {chunk_types}" + ) # Should have output item events for function calls assert len(item_added_events) > 0, f"Expected response.output_item.added events, got chunk types: {chunk_types}" assert len(item_done_events) > 0, f"Expected response.output_item.done events, got chunk types: {chunk_types}" + # Should have tool execution progress events + assert len(mcp_in_progress_events) > 0, ( + f"Expected response.mcp_call.in_progress events, got chunk types: {chunk_types}" + ) + assert len(mcp_completed_events) > 0, ( + f"Expected response.mcp_call.completed events, got chunk types: {chunk_types}" + ) + # MCP failed events are optional (only if errors occur) + + # Verify progress events have proper structure + for progress_event in mcp_in_progress_events: + assert hasattr(progress_event, "item_id"), "Progress event should have 'item_id' field" + assert hasattr(progress_event, "output_index"), "Progress event should have 'output_index' field" + assert hasattr(progress_event, "sequence_number"), "Progress event should have 'sequence_number' field" + + for completed_event in mcp_completed_events: + assert hasattr(completed_event, "sequence_number"), "Completed event should have 'sequence_number' field" + # Verify delta events have proper structure for delta_event in delta_events: assert hasattr(delta_event, "delta"), "Delta event should have 'delta' field" @@ -648,22 +682,32 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ assert isinstance(done_event.output_index, int), "Output index should be integer" assert done_event.output_index >= 0, "Output index should be non-negative" - # Group function call argument events by item_id (these should have proper tracking) - function_call_events_by_item_id = {} + # Group function call and MCP argument events by item_id (these should have proper tracking) + argument_events_by_item_id = {} for chunk in chunks: if hasattr(chunk, "item_id") and chunk.type in [ "response.function_call_arguments.delta", "response.function_call_arguments.done", + "response.mcp_call.arguments.delta", + "response.mcp_call.arguments.done", ]: item_id = chunk.item_id - if item_id not in function_call_events_by_item_id: - function_call_events_by_item_id[item_id] = [] - function_call_events_by_item_id[item_id].append(chunk) + if item_id not in argument_events_by_item_id: + argument_events_by_item_id[item_id] = [] + argument_events_by_item_id[item_id].append(chunk) - for item_id, related_events in function_call_events_by_item_id.items(): - # Should have at least one delta and one done event for a complete function call - delta_events = [e for e in related_events if e.type == "response.function_call_arguments.delta"] - done_events = [e for e in related_events if e.type == "response.function_call_arguments.done"] + for item_id, related_events in argument_events_by_item_id.items(): + # Should have at least one delta and one done event for a complete tool call + delta_events = [ + e + for e in related_events + if e.type in ["response.function_call_arguments.delta", "response.mcp_call.arguments.delta"] + ] + done_events = [ + e + for e in related_events + if e.type in ["response.function_call_arguments.done", "response.mcp_call.arguments.done"] + ] assert len(delta_events) > 0, f"Item {item_id} should have at least one delta event" assert len(done_events) == 1, f"Item {item_id} should have exactly one done event" @@ -672,6 +716,33 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ for event in related_events: assert event.item_id == item_id, f"Event should have consistent item_id {item_id}, got {event.item_id}" + # Verify content part events if they exist (for text streaming) + content_part_added_events = [chunk for chunk in chunks if chunk.type == "response.content_part.added"] + content_part_done_events = [chunk for chunk in chunks if chunk.type == "response.content_part.done"] + + # Content part events should be paired (if any exist) + if len(content_part_added_events) > 0: + assert len(content_part_done_events) > 0, ( + "Should have content_part.done events if content_part.added events exist" + ) + + # Verify content part event structure + for added_event in content_part_added_events: + assert hasattr(added_event, "response_id"), "Content part added event should have response_id" + assert hasattr(added_event, "item_id"), "Content part added event should have item_id" + assert hasattr(added_event, "part"), "Content part added event should have part" + + # TODO: enable this after the client types are updated + # assert added_event.part.type == "output_text", "Content part should be an output_text" + + for done_event in content_part_done_events: + assert hasattr(done_event, "response_id"), "Content part done event should have response_id" + assert hasattr(done_event, "item_id"), "Content part done event should have item_id" + assert hasattr(done_event, "part"), "Content part done event should have part" + + # TODO: enable this after the client types are updated + # assert len(done_event.part.text) > 0, "Content part should have text when done" + # Basic pairing check: each output_item.added should be followed by some activity # (but we can't enforce strict 1:1 pairing due to the complexity of multi-turn scenarios) assert len(item_added_events) > 0, "Should have at least one output_item.added event" diff --git a/tests/unit/providers/agents/meta_reference/test_openai_responses.py b/tests/unit/providers/agents/meta_reference/test_openai_responses.py index 855a525e9..4132a74a3 100644 --- a/tests/unit/providers/agents/meta_reference/test_openai_responses.py +++ b/tests/unit/providers/agents/meta_reference/test_openai_responses.py @@ -136,9 +136,12 @@ async def test_create_openai_response_with_string_input(openai_responses_impl, m input=input_text, model=model, temperature=0.1, + stream=True, # Enable streaming to test content part events ) - # Verify + # For streaming response, collect all chunks + chunks = [chunk async for chunk in result] + mock_inference_api.openai_chat_completion.assert_called_once_with( model=model, messages=[OpenAIUserMessageParam(role="user", content="What is the capital of Ireland?", name=None)], @@ -147,11 +150,32 @@ async def test_create_openai_response_with_string_input(openai_responses_impl, m stream=True, temperature=0.1, ) + + # Should have content part events for text streaming + # Expected: response.created, content_part.added, output_text.delta, content_part.done, response.completed + assert len(chunks) >= 4 + assert chunks[0].type == "response.created" + + # Check for content part events + content_part_added_events = [c for c in chunks if c.type == "response.content_part.added"] + content_part_done_events = [c for c in chunks if c.type == "response.content_part.done"] + text_delta_events = [c for c in chunks if c.type == "response.output_text.delta"] + + assert len(content_part_added_events) >= 1, "Should have content_part.added event for text" + assert len(content_part_done_events) >= 1, "Should have content_part.done event for text" + assert len(text_delta_events) >= 1, "Should have text delta events" + + # Verify final event is completion + assert chunks[-1].type == "response.completed" + + # When streaming, the final response is in the last chunk + final_response = chunks[-1].response + assert final_response.model == model + assert len(final_response.output) == 1 + assert isinstance(final_response.output[0], OpenAIResponseMessage) + openai_responses_impl.responses_store.store_response_object.assert_called_once() - assert result.model == model - assert len(result.output) == 1 - assert isinstance(result.output[0], OpenAIResponseMessage) - assert result.output[0].content[0].text == "Dublin" + assert final_response.output[0].content[0].text == "Dublin" async def test_create_openai_response_with_string_input_with_tools(openai_responses_impl, mock_inference_api): @@ -272,6 +296,8 @@ async def test_create_openai_response_with_tool_call_type_none(openai_responses_ # Check that we got the content from our mocked tool execution result chunks = [chunk async for chunk in result] + + # Verify event types # Should have: response.created, output_item.added, function_call_arguments.delta, # function_call_arguments.done, output_item.done, response.completed assert len(chunks) == 6