From e1e161553c323e2477f24c7091a74cb51f18ef78 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Wed, 13 Aug 2025 16:34:26 -0700 Subject: [PATCH] feat(responses): add MCP argument streaming and content part events (#3136) # What does this PR do? Adds content part streaming events to the OpenAI-compatible Responses API to support more granular streaming of response content. This introduces: 1. New schema types for content parts: `OpenAIResponseContentPart` with variants for text output and refusals 2. New streaming event types: - `OpenAIResponseObjectStreamResponseContentPartAdded` for when content parts begin - `OpenAIResponseObjectStreamResponseContentPartDone` for when content parts complete 3. Implementation in the reference provider to emit these events during streaming responses. Also emits MCP arguments just like function call ones. ## Test Plan Updated existing streaming tests to verify content part events are properly emitted --- docs/_static/llama-stack-spec.html | 137 ++++++++++++++++++ docs/_static/llama-stack-spec.yaml | 111 ++++++++++++++ llama_stack/apis/agents/openai_responses.py | 58 ++++++++ .../agents/meta_reference/openai_responses.py | 96 ++++++++++-- .../non_ci/responses/test_responses.py | 77 ++++++++-- .../meta_reference/test_openai_responses.py | 36 ++++- 6 files changed, 480 insertions(+), 35 deletions(-) diff --git a/docs/_static/llama-stack-spec.html b/docs/_static/llama-stack-spec.html index 25f916d87..0549dda21 100644 --- a/docs/_static/llama-stack-spec.html +++ b/docs/_static/llama-stack-spec.html @@ -8821,6 +8821,61 @@ "title": "OpenAIResponseOutputMessageMCPListTools", "description": "MCP list tools output message containing available tools from an MCP server." }, + "OpenAIResponseContentPart": { + "oneOf": [ + { + "$ref": "#/components/schemas/OpenAIResponseContentPartOutputText" + }, + { + "$ref": "#/components/schemas/OpenAIResponseContentPartRefusal" + } + ], + "discriminator": { + "propertyName": "type", + "mapping": { + "output_text": "#/components/schemas/OpenAIResponseContentPartOutputText", + "refusal": "#/components/schemas/OpenAIResponseContentPartRefusal" + } + } + }, + "OpenAIResponseContentPartOutputText": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "output_text", + "default": "output_text" + }, + "text": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "type", + "text" + ], + "title": "OpenAIResponseContentPartOutputText" + }, + "OpenAIResponseContentPartRefusal": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "refusal", + "default": "refusal" + }, + "refusal": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "type", + "refusal" + ], + "title": "OpenAIResponseContentPartRefusal" + }, "OpenAIResponseObjectStream": { "oneOf": [ { @@ -8877,6 +8932,12 @@ { "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted" }, + { + "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded" + }, + { + "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone" + }, { "$ref": "#/components/schemas/OpenAIResponseObjectStreamResponseCompleted" } @@ -8902,6 +8963,8 @@ "response.mcp_call.in_progress": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress", "response.mcp_call.failed": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed", "response.mcp_call.completed": "#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted", + "response.content_part.added": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded", + "response.content_part.done": "#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone", "response.completed": "#/components/schemas/OpenAIResponseObjectStreamResponseCompleted" } } @@ -8928,6 +8991,80 @@ "title": "OpenAIResponseObjectStreamResponseCompleted", "description": "Streaming event indicating a response has been completed." }, + "OpenAIResponseObjectStreamResponseContentPartAdded": { + "type": "object", + "properties": { + "response_id": { + "type": "string", + "description": "Unique identifier of the response containing this content" + }, + "item_id": { + "type": "string", + "description": "Unique identifier of the output item containing this content part" + }, + "part": { + "$ref": "#/components/schemas/OpenAIResponseContentPart", + "description": "The content part that was added" + }, + "sequence_number": { + "type": "integer", + "description": "Sequential number for ordering streaming events" + }, + "type": { + "type": "string", + "const": "response.content_part.added", + "default": "response.content_part.added", + "description": "Event type identifier, always \"response.content_part.added\"" + } + }, + "additionalProperties": false, + "required": [ + "response_id", + "item_id", + "part", + "sequence_number", + "type" + ], + "title": "OpenAIResponseObjectStreamResponseContentPartAdded", + "description": "Streaming event for when a new content part is added to a response item." + }, + "OpenAIResponseObjectStreamResponseContentPartDone": { + "type": "object", + "properties": { + "response_id": { + "type": "string", + "description": "Unique identifier of the response containing this content" + }, + "item_id": { + "type": "string", + "description": "Unique identifier of the output item containing this content part" + }, + "part": { + "$ref": "#/components/schemas/OpenAIResponseContentPart", + "description": "The completed content part" + }, + "sequence_number": { + "type": "integer", + "description": "Sequential number for ordering streaming events" + }, + "type": { + "type": "string", + "const": "response.content_part.done", + "default": "response.content_part.done", + "description": "Event type identifier, always \"response.content_part.done\"" + } + }, + "additionalProperties": false, + "required": [ + "response_id", + "item_id", + "part", + "sequence_number", + "type" + ], + "title": "OpenAIResponseObjectStreamResponseContentPartDone", + "description": "Streaming event for when a content part is completed." + }, "OpenAIResponseObjectStreamResponseCreated": { "type": "object", "properties": { diff --git a/docs/_static/llama-stack-spec.yaml b/docs/_static/llama-stack-spec.yaml index 43e9fa95a..aa47cd58d 100644 --- a/docs/_static/llama-stack-spec.yaml +++ b/docs/_static/llama-stack-spec.yaml @@ -6441,6 +6441,43 @@ components: title: OpenAIResponseOutputMessageMCPListTools description: >- MCP list tools output message containing available tools from an MCP server. + OpenAIResponseContentPart: + oneOf: + - $ref: '#/components/schemas/OpenAIResponseContentPartOutputText' + - $ref: '#/components/schemas/OpenAIResponseContentPartRefusal' + discriminator: + propertyName: type + mapping: + output_text: '#/components/schemas/OpenAIResponseContentPartOutputText' + refusal: '#/components/schemas/OpenAIResponseContentPartRefusal' + OpenAIResponseContentPartOutputText: + type: object + properties: + type: + type: string + const: output_text + default: output_text + text: + type: string + additionalProperties: false + required: + - type + - text + title: OpenAIResponseContentPartOutputText + OpenAIResponseContentPartRefusal: + type: object + properties: + type: + type: string + const: refusal + default: refusal + refusal: + type: string + additionalProperties: false + required: + - type + - refusal + title: OpenAIResponseContentPartRefusal OpenAIResponseObjectStream: oneOf: - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseCreated' @@ -6461,6 +6498,8 @@ components: - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted' + - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded' + - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone' - $ref: '#/components/schemas/OpenAIResponseObjectStreamResponseCompleted' discriminator: propertyName: type @@ -6483,6 +6522,8 @@ components: response.mcp_call.in_progress: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallInProgress' response.mcp_call.failed: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallFailed' response.mcp_call.completed: '#/components/schemas/OpenAIResponseObjectStreamResponseMcpCallCompleted' + response.content_part.added: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartAdded' + response.content_part.done: '#/components/schemas/OpenAIResponseObjectStreamResponseContentPartDone' response.completed: '#/components/schemas/OpenAIResponseObjectStreamResponseCompleted' "OpenAIResponseObjectStreamResponseCompleted": type: object @@ -6504,6 +6545,76 @@ components: OpenAIResponseObjectStreamResponseCompleted description: >- Streaming event indicating a response has been completed. + "OpenAIResponseObjectStreamResponseContentPartAdded": + type: object + properties: + response_id: + type: string + description: >- + Unique identifier of the response containing this content + item_id: + type: string + description: >- + Unique identifier of the output item containing this content part + part: + $ref: '#/components/schemas/OpenAIResponseContentPart' + description: The content part that was added + sequence_number: + type: integer + description: >- + Sequential number for ordering streaming events + type: + type: string + const: response.content_part.added + default: response.content_part.added + description: >- + Event type identifier, always "response.content_part.added" + additionalProperties: false + required: + - response_id + - item_id + - part + - sequence_number + - type + title: >- + OpenAIResponseObjectStreamResponseContentPartAdded + description: >- + Streaming event for when a new content part is added to a response item. + "OpenAIResponseObjectStreamResponseContentPartDone": + type: object + properties: + response_id: + type: string + description: >- + Unique identifier of the response containing this content + item_id: + type: string + description: >- + Unique identifier of the output item containing this content part + part: + $ref: '#/components/schemas/OpenAIResponseContentPart' + description: The completed content part + sequence_number: + type: integer + description: >- + Sequential number for ordering streaming events + type: + type: string + const: response.content_part.done + default: response.content_part.done + description: >- + Event type identifier, always "response.content_part.done" + additionalProperties: false + required: + - response_id + - item_id + - part + - sequence_number + - type + title: >- + OpenAIResponseObjectStreamResponseContentPartDone + description: >- + Streaming event for when a content part is completed. "OpenAIResponseObjectStreamResponseCreated": type: object properties: diff --git a/llama_stack/apis/agents/openai_responses.py b/llama_stack/apis/agents/openai_responses.py index 8574104dc..591992479 100644 --- a/llama_stack/apis/agents/openai_responses.py +++ b/llama_stack/apis/agents/openai_responses.py @@ -623,6 +623,62 @@ class OpenAIResponseObjectStreamResponseMcpCallCompleted(BaseModel): type: Literal["response.mcp_call.completed"] = "response.mcp_call.completed" +@json_schema_type +class OpenAIResponseContentPartOutputText(BaseModel): + type: Literal["output_text"] = "output_text" + text: str + # TODO: add annotations, logprobs, etc. + + +@json_schema_type +class OpenAIResponseContentPartRefusal(BaseModel): + type: Literal["refusal"] = "refusal" + refusal: str + + +OpenAIResponseContentPart = Annotated[ + OpenAIResponseContentPartOutputText | OpenAIResponseContentPartRefusal, + Field(discriminator="type"), +] +register_schema(OpenAIResponseContentPart, name="OpenAIResponseContentPart") + + +@json_schema_type +class OpenAIResponseObjectStreamResponseContentPartAdded(BaseModel): + """Streaming event for when a new content part is added to a response item. + + :param response_id: Unique identifier of the response containing this content + :param item_id: Unique identifier of the output item containing this content part + :param part: The content part that was added + :param sequence_number: Sequential number for ordering streaming events + :param type: Event type identifier, always "response.content_part.added" + """ + + response_id: str + item_id: str + part: OpenAIResponseContentPart + sequence_number: int + type: Literal["response.content_part.added"] = "response.content_part.added" + + +@json_schema_type +class OpenAIResponseObjectStreamResponseContentPartDone(BaseModel): + """Streaming event for when a content part is completed. + + :param response_id: Unique identifier of the response containing this content + :param item_id: Unique identifier of the output item containing this content part + :param part: The completed content part + :param sequence_number: Sequential number for ordering streaming events + :param type: Event type identifier, always "response.content_part.done" + """ + + response_id: str + item_id: str + part: OpenAIResponseContentPart + sequence_number: int + type: Literal["response.content_part.done"] = "response.content_part.done" + + OpenAIResponseObjectStream = Annotated[ OpenAIResponseObjectStreamResponseCreated | OpenAIResponseObjectStreamResponseOutputItemAdded @@ -642,6 +698,8 @@ OpenAIResponseObjectStream = Annotated[ | OpenAIResponseObjectStreamResponseMcpCallInProgress | OpenAIResponseObjectStreamResponseMcpCallFailed | OpenAIResponseObjectStreamResponseMcpCallCompleted + | OpenAIResponseObjectStreamResponseContentPartAdded + | OpenAIResponseObjectStreamResponseContentPartDone | OpenAIResponseObjectStreamResponseCompleted, Field(discriminator="type"), ] diff --git a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py index fbb5a608a..6aca4d68e 100644 --- a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py +++ b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py @@ -20,6 +20,7 @@ from llama_stack.apis.agents.openai_responses import ( ListOpenAIResponseInputItem, ListOpenAIResponseObject, OpenAIDeleteResponseObject, + OpenAIResponseContentPartOutputText, OpenAIResponseInput, OpenAIResponseInputFunctionToolCallOutput, OpenAIResponseInputMessageContent, @@ -32,9 +33,13 @@ from llama_stack.apis.agents.openai_responses import ( OpenAIResponseObject, OpenAIResponseObjectStream, OpenAIResponseObjectStreamResponseCompleted, + OpenAIResponseObjectStreamResponseContentPartAdded, + OpenAIResponseObjectStreamResponseContentPartDone, OpenAIResponseObjectStreamResponseCreated, OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta, OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDone, OpenAIResponseObjectStreamResponseMcpCallCompleted, OpenAIResponseObjectStreamResponseMcpCallFailed, OpenAIResponseObjectStreamResponseMcpCallInProgress, @@ -475,6 +480,8 @@ class OpenAIResponsesImpl: message_item_id = f"msg_{uuid.uuid4()}" # Track tool call items for streaming events tool_call_item_ids: dict[int, str] = {} + # Track content parts for streaming events + content_part_emitted = False async for chunk in completion_result: chat_response_id = chunk.id @@ -483,6 +490,18 @@ class OpenAIResponsesImpl: for chunk_choice in chunk.choices: # Emit incremental text content as delta events if chunk_choice.delta.content: + # Emit content_part.added event for first text chunk + if not content_part_emitted: + content_part_emitted = True + sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartAdded( + response_id=response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text="", # Will be filled incrementally via text deltas + ), + sequence_number=sequence_number, + ) sequence_number += 1 yield OpenAIResponseObjectStreamResponseOutputTextDelta( content_index=0, @@ -529,16 +548,33 @@ class OpenAIResponsesImpl: sequence_number=sequence_number, ) - # Stream function call arguments as they arrive + # Stream tool call arguments as they arrive (differentiate between MCP and function calls) if tool_call.function and tool_call.function.arguments: tool_call_item_id = tool_call_item_ids[tool_call.index] sequence_number += 1 - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( - delta=tool_call.function.arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, + + # Check if this is an MCP tool call + is_mcp_tool = ( + ctx.mcp_tool_to_server + and tool_call.function.name + and tool_call.function.name in ctx.mcp_tool_to_server ) + if is_mcp_tool: + # Emit MCP-specific argument delta event + yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) + else: + # Emit function call argument delta event + yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) # Accumulate arguments for final response (only for subsequent chunks) if not is_new_tool_call: @@ -546,27 +582,55 @@ class OpenAIResponsesImpl: response_tool_call.function.arguments or "" ) + tool_call.function.arguments - # Emit function_call_arguments.done events for completed tool calls + # Emit arguments.done events for completed tool calls (differentiate between MCP and function calls) for tool_call_index in sorted(chat_response_tool_calls.keys()): tool_call_item_id = tool_call_item_ids[tool_call_index] final_arguments = chat_response_tool_calls[tool_call_index].function.arguments or "" + tool_call_name = chat_response_tool_calls[tool_call_index].function.name + + # Check if this is an MCP tool call + is_mcp_tool = ctx.mcp_tool_to_server and tool_call_name and tool_call_name in ctx.mcp_tool_to_server sequence_number += 1 - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone( - arguments=final_arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) + if is_mcp_tool: + # Emit MCP-specific argument done event + yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDone( + arguments=final_arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) + else: + # Emit function call argument done event + yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone( + arguments=final_arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=sequence_number, + ) # Convert collected chunks to complete response if chat_response_tool_calls: tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())] - - # when there are tool calls, we need to clear the content - chat_response_content = [] else: tool_calls = None + # Emit content_part.done event if text content was streamed (before content gets cleared) + if content_part_emitted: + final_text = "".join(chat_response_content) + sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartDone( + response_id=response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text=final_text, + ), + sequence_number=sequence_number, + ) + + # Clear content when there are tool calls (OpenAI spec behavior) + if chat_response_tool_calls: + chat_response_content = [] + assistant_message = OpenAIAssistantMessageParam( content="".join(chat_response_content), tool_calls=tool_calls, diff --git a/tests/integration/non_ci/responses/test_responses.py b/tests/integration/non_ci/responses/test_responses.py index 776e3cf30..04266eec8 100644 --- a/tests/integration/non_ci/responses/test_responses.py +++ b/tests/integration/non_ci/responses/test_responses.py @@ -590,9 +590,17 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ # Verify tool call streaming events are present chunk_types = [chunk.type for chunk in chunks] - # Should have function call arguments delta events for tool calls - delta_events = [chunk for chunk in chunks if chunk.type == "response.function_call_arguments.delta"] - done_events = [chunk for chunk in chunks if chunk.type == "response.function_call_arguments.done"] + # Should have function call or MCP arguments delta/done events for tool calls + delta_events = [ + chunk + for chunk in chunks + if chunk.type in ["response.function_call_arguments.delta", "response.mcp_call.arguments.delta"] + ] + done_events = [ + chunk + for chunk in chunks + if chunk.type in ["response.function_call_arguments.done", "response.mcp_call.arguments.done"] + ] # Should have output item events for tool calls item_added_events = [chunk for chunk in chunks if chunk.type == "response.output_item.added"] @@ -606,8 +614,12 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ assert len(chunks) > 10, f"Expected rich streaming with many events, got only {len(chunks)} chunks" # Since this test involves MCP tool calls, we should see streaming events - assert len(delta_events) > 0, f"Expected function_call_arguments.delta events, got chunk types: {chunk_types}" - assert len(done_events) > 0, f"Expected function_call_arguments.done events, got chunk types: {chunk_types}" + assert len(delta_events) > 0, ( + f"Expected function_call_arguments.delta or mcp_call.arguments.delta events, got chunk types: {chunk_types}" + ) + assert len(done_events) > 0, ( + f"Expected function_call_arguments.done or mcp_call.arguments.done events, got chunk types: {chunk_types}" + ) # Should have output item events for function calls assert len(item_added_events) > 0, f"Expected response.output_item.added events, got chunk types: {chunk_types}" @@ -670,22 +682,32 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ assert isinstance(done_event.output_index, int), "Output index should be integer" assert done_event.output_index >= 0, "Output index should be non-negative" - # Group function call argument events by item_id (these should have proper tracking) - function_call_events_by_item_id = {} + # Group function call and MCP argument events by item_id (these should have proper tracking) + argument_events_by_item_id = {} for chunk in chunks: if hasattr(chunk, "item_id") and chunk.type in [ "response.function_call_arguments.delta", "response.function_call_arguments.done", + "response.mcp_call.arguments.delta", + "response.mcp_call.arguments.done", ]: item_id = chunk.item_id - if item_id not in function_call_events_by_item_id: - function_call_events_by_item_id[item_id] = [] - function_call_events_by_item_id[item_id].append(chunk) + if item_id not in argument_events_by_item_id: + argument_events_by_item_id[item_id] = [] + argument_events_by_item_id[item_id].append(chunk) - for item_id, related_events in function_call_events_by_item_id.items(): - # Should have at least one delta and one done event for a complete function call - delta_events = [e for e in related_events if e.type == "response.function_call_arguments.delta"] - done_events = [e for e in related_events if e.type == "response.function_call_arguments.done"] + for item_id, related_events in argument_events_by_item_id.items(): + # Should have at least one delta and one done event for a complete tool call + delta_events = [ + e + for e in related_events + if e.type in ["response.function_call_arguments.delta", "response.mcp_call.arguments.delta"] + ] + done_events = [ + e + for e in related_events + if e.type in ["response.function_call_arguments.done", "response.mcp_call.arguments.done"] + ] assert len(delta_events) > 0, f"Item {item_id} should have at least one delta event" assert len(done_events) == 1, f"Item {item_id} should have exactly one done event" @@ -694,6 +716,33 @@ def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_ for event in related_events: assert event.item_id == item_id, f"Event should have consistent item_id {item_id}, got {event.item_id}" + # Verify content part events if they exist (for text streaming) + content_part_added_events = [chunk for chunk in chunks if chunk.type == "response.content_part.added"] + content_part_done_events = [chunk for chunk in chunks if chunk.type == "response.content_part.done"] + + # Content part events should be paired (if any exist) + if len(content_part_added_events) > 0: + assert len(content_part_done_events) > 0, ( + "Should have content_part.done events if content_part.added events exist" + ) + + # Verify content part event structure + for added_event in content_part_added_events: + assert hasattr(added_event, "response_id"), "Content part added event should have response_id" + assert hasattr(added_event, "item_id"), "Content part added event should have item_id" + assert hasattr(added_event, "part"), "Content part added event should have part" + + # TODO: enable this after the client types are updated + # assert added_event.part.type == "output_text", "Content part should be an output_text" + + for done_event in content_part_done_events: + assert hasattr(done_event, "response_id"), "Content part done event should have response_id" + assert hasattr(done_event, "item_id"), "Content part done event should have item_id" + assert hasattr(done_event, "part"), "Content part done event should have part" + + # TODO: enable this after the client types are updated + # assert len(done_event.part.text) > 0, "Content part should have text when done" + # Basic pairing check: each output_item.added should be followed by some activity # (but we can't enforce strict 1:1 pairing due to the complexity of multi-turn scenarios) assert len(item_added_events) > 0, "Should have at least one output_item.added event" diff --git a/tests/unit/providers/agents/meta_reference/test_openai_responses.py b/tests/unit/providers/agents/meta_reference/test_openai_responses.py index 855a525e9..4132a74a3 100644 --- a/tests/unit/providers/agents/meta_reference/test_openai_responses.py +++ b/tests/unit/providers/agents/meta_reference/test_openai_responses.py @@ -136,9 +136,12 @@ async def test_create_openai_response_with_string_input(openai_responses_impl, m input=input_text, model=model, temperature=0.1, + stream=True, # Enable streaming to test content part events ) - # Verify + # For streaming response, collect all chunks + chunks = [chunk async for chunk in result] + mock_inference_api.openai_chat_completion.assert_called_once_with( model=model, messages=[OpenAIUserMessageParam(role="user", content="What is the capital of Ireland?", name=None)], @@ -147,11 +150,32 @@ async def test_create_openai_response_with_string_input(openai_responses_impl, m stream=True, temperature=0.1, ) + + # Should have content part events for text streaming + # Expected: response.created, content_part.added, output_text.delta, content_part.done, response.completed + assert len(chunks) >= 4 + assert chunks[0].type == "response.created" + + # Check for content part events + content_part_added_events = [c for c in chunks if c.type == "response.content_part.added"] + content_part_done_events = [c for c in chunks if c.type == "response.content_part.done"] + text_delta_events = [c for c in chunks if c.type == "response.output_text.delta"] + + assert len(content_part_added_events) >= 1, "Should have content_part.added event for text" + assert len(content_part_done_events) >= 1, "Should have content_part.done event for text" + assert len(text_delta_events) >= 1, "Should have text delta events" + + # Verify final event is completion + assert chunks[-1].type == "response.completed" + + # When streaming, the final response is in the last chunk + final_response = chunks[-1].response + assert final_response.model == model + assert len(final_response.output) == 1 + assert isinstance(final_response.output[0], OpenAIResponseMessage) + openai_responses_impl.responses_store.store_response_object.assert_called_once() - assert result.model == model - assert len(result.output) == 1 - assert isinstance(result.output[0], OpenAIResponseMessage) - assert result.output[0].content[0].text == "Dublin" + assert final_response.output[0].content[0].text == "Dublin" async def test_create_openai_response_with_string_input_with_tools(openai_responses_impl, mock_inference_api): @@ -272,6 +296,8 @@ async def test_create_openai_response_with_tool_call_type_none(openai_responses_ # Check that we got the content from our mocked tool execution result chunks = [chunk async for chunk in result] + + # Verify event types # Should have: response.created, output_item.added, function_call_arguments.delta, # function_call_arguments.done, output_item.done, response.completed assert len(chunks) == 6