feat(responses)!: add in_progress, failed, content part events (#3765)

## Summary
- add schema + runtime support for response.in_progress /
response.failed / response.incomplete
- stream content parts with proper indexes and reasoning slots
- align tests + docs with the richer event payloads

## Testing
- uv run pytest
tests/unit/providers/agents/meta_reference/test_openai_responses.py::test_create_openai_response_with_string_input
- uv run pytest
tests/unit/providers/agents/meta_reference/test_response_conversion_utils.py
This commit is contained in:
Ashwin Bharambe 2025-10-10 07:27:34 -07:00 committed by GitHub
parent a548169b99
commit e039b61d26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1431 additions and 221 deletions

View file

@ -438,7 +438,7 @@ class OpenAIDeleteResponseObject(BaseModel):
class OpenAIResponseObjectStreamResponseCreated(BaseModel):
"""Streaming event indicating a new response has been created.
:param response: The newly created response object
:param response: The response object that was created
:param type: Event type identifier, always "response.created"
"""
@ -446,11 +446,25 @@ class OpenAIResponseObjectStreamResponseCreated(BaseModel):
type: Literal["response.created"] = "response.created"
@json_schema_type
class OpenAIResponseObjectStreamResponseInProgress(BaseModel):
"""Streaming event indicating the response remains in progress.
:param response: Current response state while in progress
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.in_progress"
"""
response: OpenAIResponseObject
sequence_number: int
type: Literal["response.in_progress"] = "response.in_progress"
@json_schema_type
class OpenAIResponseObjectStreamResponseCompleted(BaseModel):
"""Streaming event indicating a response has been completed.
:param response: The completed response object
:param response: Completed response object
:param type: Event type identifier, always "response.completed"
"""
@ -458,6 +472,34 @@ class OpenAIResponseObjectStreamResponseCompleted(BaseModel):
type: Literal["response.completed"] = "response.completed"
@json_schema_type
class OpenAIResponseObjectStreamResponseIncomplete(BaseModel):
"""Streaming event emitted when a response ends in an incomplete state.
:param response: Response object describing the incomplete state
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.incomplete"
"""
response: OpenAIResponseObject
sequence_number: int
type: Literal["response.incomplete"] = "response.incomplete"
@json_schema_type
class OpenAIResponseObjectStreamResponseFailed(BaseModel):
"""Streaming event emitted when a response fails.
:param response: Response object describing the failure
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.failed"
"""
response: OpenAIResponseObject
sequence_number: int
type: Literal["response.failed"] = "response.failed"
@json_schema_type
class OpenAIResponseObjectStreamResponseOutputItemAdded(BaseModel):
"""Streaming event for when a new output item is added to the response.
@ -688,19 +730,46 @@ class OpenAIResponseObjectStreamResponseMcpCallCompleted(BaseModel):
@json_schema_type
class OpenAIResponseContentPartOutputText(BaseModel):
"""Text content within a streamed response part.
:param type: Content part type identifier, always "output_text"
:param text: Text emitted for this content part
:param annotations: Structured annotations associated with the text
:param logprobs: (Optional) Token log probability details
"""
type: Literal["output_text"] = "output_text"
text: str
# TODO: add annotations, logprobs, etc.
annotations: list[OpenAIResponseAnnotations] = Field(default_factory=list)
logprobs: list[dict[str, Any]] | None = None
@json_schema_type
class OpenAIResponseContentPartRefusal(BaseModel):
"""Refusal content within a streamed response part.
:param type: Content part type identifier, always "refusal"
:param refusal: Refusal text supplied by the model
"""
type: Literal["refusal"] = "refusal"
refusal: str
@json_schema_type
class OpenAIResponseContentPartReasoningText(BaseModel):
"""Reasoning text emitted as part of a streamed response.
:param type: Content part type identifier, always "reasoning_text"
:param text: Reasoning text supplied by the model
"""
type: Literal["reasoning_text"] = "reasoning_text"
text: str
OpenAIResponseContentPart = Annotated[
OpenAIResponseContentPartOutputText | OpenAIResponseContentPartRefusal,
OpenAIResponseContentPartOutputText | OpenAIResponseContentPartRefusal | OpenAIResponseContentPartReasoningText,
Field(discriminator="type"),
]
register_schema(OpenAIResponseContentPart, name="OpenAIResponseContentPart")
@ -710,15 +779,19 @@ register_schema(OpenAIResponseContentPart, name="OpenAIResponseContentPart")
class OpenAIResponseObjectStreamResponseContentPartAdded(BaseModel):
"""Streaming event for when a new content part is added to a response item.
:param content_index: Index position of the part within the content array
:param response_id: Unique identifier of the response containing this content
:param item_id: Unique identifier of the output item containing this content part
:param output_index: Index position of the output item in the response
:param part: The content part that was added
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.content_part.added"
"""
content_index: int
response_id: str
item_id: str
output_index: int
part: OpenAIResponseContentPart
sequence_number: int
type: Literal["response.content_part.added"] = "response.content_part.added"
@ -728,15 +801,19 @@ class OpenAIResponseObjectStreamResponseContentPartAdded(BaseModel):
class OpenAIResponseObjectStreamResponseContentPartDone(BaseModel):
"""Streaming event for when a content part is completed.
:param content_index: Index position of the part within the content array
:param response_id: Unique identifier of the response containing this content
:param item_id: Unique identifier of the output item containing this content part
:param output_index: Index position of the output item in the response
:param part: The completed content part
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.content_part.done"
"""
content_index: int
response_id: str
item_id: str
output_index: int
part: OpenAIResponseContentPart
sequence_number: int
type: Literal["response.content_part.done"] = "response.content_part.done"
@ -744,6 +821,7 @@ class OpenAIResponseObjectStreamResponseContentPartDone(BaseModel):
OpenAIResponseObjectStream = Annotated[
OpenAIResponseObjectStreamResponseCreated
| OpenAIResponseObjectStreamResponseInProgress
| OpenAIResponseObjectStreamResponseOutputItemAdded
| OpenAIResponseObjectStreamResponseOutputItemDone
| OpenAIResponseObjectStreamResponseOutputTextDelta
@ -763,6 +841,8 @@ OpenAIResponseObjectStream = Annotated[
| OpenAIResponseObjectStreamResponseMcpCallCompleted
| OpenAIResponseObjectStreamResponseContentPartAdded
| OpenAIResponseObjectStreamResponseContentPartDone
| OpenAIResponseObjectStreamResponseIncomplete
| OpenAIResponseObjectStreamResponseFailed
| OpenAIResponseObjectStreamResponseCompleted,
Field(discriminator="type"),
]

View file

@ -232,17 +232,33 @@ class OpenAIResponsesImpl:
if stream:
return stream_gen
else:
response = None
async for stream_chunk in stream_gen:
if stream_chunk.type == "response.completed":
if response is not None:
raise ValueError("The response stream completed multiple times! Earlier response: {response}")
response = stream_chunk.response
# don't leave the generator half complete!
final_response = None
final_event_type = None
failed_response = None
if response is None:
raise ValueError("The response stream never completed")
return response
async for stream_chunk in stream_gen:
if stream_chunk.type in {"response.completed", "response.incomplete"}:
if final_response is not None:
raise ValueError(
"The response stream produced multiple terminal responses! "
f"Earlier response from {final_event_type}"
)
final_response = stream_chunk.response
final_event_type = stream_chunk.type
elif stream_chunk.type == "response.failed":
failed_response = stream_chunk.response
if failed_response is not None:
error_message = (
failed_response.error.message
if failed_response and failed_response.error
else "Response stream failed without error details"
)
raise RuntimeError(f"OpenAI response failed: {error_message}")
if final_response is None:
raise ValueError("The response stream never reached a terminal state")
return final_response
async def _create_streaming_response(
self,
@ -288,13 +304,16 @@ class OpenAIResponsesImpl:
# Stream the response
final_response = None
failed_response = None
async for stream_chunk in orchestrator.create_response():
if stream_chunk.type == "response.completed":
if stream_chunk.type in {"response.completed", "response.incomplete"}:
final_response = stream_chunk.response
elif stream_chunk.type == "response.failed":
failed_response = stream_chunk.response
yield stream_chunk
# Store the response if requested
if store and final_response:
if store and final_response and failed_response is None:
await self._store_response(
response=final_response,
input=all_input,

View file

@ -13,6 +13,7 @@ from llama_stack.apis.agents.openai_responses import (
ApprovalFilter,
MCPListToolsTool,
OpenAIResponseContentPartOutputText,
OpenAIResponseError,
OpenAIResponseInputTool,
OpenAIResponseInputToolMCP,
OpenAIResponseMCPApprovalRequest,
@ -22,8 +23,11 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseContentPartDone,
OpenAIResponseObjectStreamResponseCreated,
OpenAIResponseObjectStreamResponseFailed,
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta,
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone,
OpenAIResponseObjectStreamResponseIncomplete,
OpenAIResponseObjectStreamResponseInProgress,
OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta,
OpenAIResponseObjectStreamResponseMcpCallArgumentsDone,
OpenAIResponseObjectStreamResponseMcpListToolsCompleted,
@ -101,21 +105,46 @@ class StreamingResponseOrchestrator:
# mapping for annotations
self.citation_files: dict[str, str] = {}
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
# Initialize output messages
output_messages: list[OpenAIResponseOutput] = []
# Create initial response and emit response.created immediately
initial_response = OpenAIResponseObject(
def _clone_outputs(self, outputs: list[OpenAIResponseOutput]) -> list[OpenAIResponseOutput]:
cloned: list[OpenAIResponseOutput] = []
for item in outputs:
if hasattr(item, "model_copy"):
cloned.append(item.model_copy(deep=True))
else:
cloned.append(item)
return cloned
def _snapshot_response(
self,
status: str,
outputs: list[OpenAIResponseOutput],
*,
error: OpenAIResponseError | None = None,
) -> OpenAIResponseObject:
return OpenAIResponseObject(
created_at=self.created_at,
id=self.response_id,
model=self.ctx.model,
object="response",
status="in_progress",
output=output_messages.copy(),
status=status,
output=self._clone_outputs(outputs),
text=self.text,
error=error,
)
yield OpenAIResponseObjectStreamResponseCreated(response=initial_response)
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
output_messages: list[OpenAIResponseOutput] = []
# Emit response.created followed by response.in_progress to align with OpenAI streaming
yield OpenAIResponseObjectStreamResponseCreated(
response=self._snapshot_response("in_progress", output_messages)
)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseInProgress(
response=self._snapshot_response("in_progress", output_messages),
sequence_number=self.sequence_number,
)
# Process all tools (including MCP tools) and emit streaming events
if self.ctx.response_tools:
@ -124,87 +153,114 @@ class StreamingResponseOrchestrator:
n_iter = 0
messages = self.ctx.messages.copy()
final_status = "completed"
last_completion_result: ChatCompletionResult | None = None
while True:
# Text is the default response format for chat completion so don't need to pass it
# (some providers don't support non-empty response_format when tools are present)
response_format = None if self.ctx.response_format.type == "text" else self.ctx.response_format
logger.debug(f"calling openai_chat_completion with tools: {self.ctx.chat_tools}")
completion_result = await self.inference_api.openai_chat_completion(
model=self.ctx.model,
messages=messages,
tools=self.ctx.chat_tools,
stream=True,
temperature=self.ctx.temperature,
response_format=response_format,
)
try:
while True:
# Text is the default response format for chat completion so don't need to pass it
# (some providers don't support non-empty response_format when tools are present)
response_format = None if self.ctx.response_format.type == "text" else self.ctx.response_format
logger.debug(f"calling openai_chat_completion with tools: {self.ctx.chat_tools}")
completion_result = await self.inference_api.openai_chat_completion(
model=self.ctx.model,
messages=messages,
tools=self.ctx.chat_tools,
stream=True,
temperature=self.ctx.temperature,
response_format=response_format,
)
# Process streaming chunks and build complete response
completion_result_data = None
async for stream_event_or_result in self._process_streaming_chunks(completion_result, output_messages):
if isinstance(stream_event_or_result, ChatCompletionResult):
completion_result_data = stream_event_or_result
else:
yield stream_event_or_result
if not completion_result_data:
raise ValueError("Streaming chunk processor failed to return completion data")
current_response = self._build_chat_completion(completion_result_data)
# Process streaming chunks and build complete response
completion_result_data = None
async for stream_event_or_result in self._process_streaming_chunks(completion_result, output_messages):
if isinstance(stream_event_or_result, ChatCompletionResult):
completion_result_data = stream_event_or_result
else:
yield stream_event_or_result
if not completion_result_data:
raise ValueError("Streaming chunk processor failed to return completion data")
last_completion_result = completion_result_data
current_response = self._build_chat_completion(completion_result_data)
function_tool_calls, non_function_tool_calls, approvals, next_turn_messages = self._separate_tool_calls(
current_response, messages
)
(
function_tool_calls,
non_function_tool_calls,
approvals,
next_turn_messages,
) = self._separate_tool_calls(current_response, messages)
# add any approval requests required
for tool_call in approvals:
async for evt in self._add_mcp_approval_request(
tool_call.function.name, tool_call.function.arguments, output_messages
# add any approval requests required
for tool_call in approvals:
async for evt in self._add_mcp_approval_request(
tool_call.function.name, tool_call.function.arguments, output_messages
):
yield evt
# Handle choices with no tool calls
for choice in current_response.choices:
if not (choice.message.tool_calls and self.ctx.response_tools):
output_messages.append(
await convert_chat_choice_to_response_message(
choice,
self.citation_files,
message_id=completion_result_data.message_item_id,
)
)
# Execute tool calls and coordinate results
async for stream_event in self._coordinate_tool_execution(
function_tool_calls,
non_function_tool_calls,
completion_result_data,
output_messages,
next_turn_messages,
):
yield evt
yield stream_event
# Handle choices with no tool calls
for choice in current_response.choices:
if not (choice.message.tool_calls and self.ctx.response_tools):
output_messages.append(await convert_chat_choice_to_response_message(choice, self.citation_files))
messages = next_turn_messages
# Execute tool calls and coordinate results
async for stream_event in self._coordinate_tool_execution(
function_tool_calls,
non_function_tool_calls,
completion_result_data,
output_messages,
next_turn_messages,
):
yield stream_event
if not function_tool_calls and not non_function_tool_calls:
break
messages = next_turn_messages
if function_tool_calls:
logger.info("Exiting inference loop since there is a function (client-side) tool call")
break
if not function_tool_calls and not non_function_tool_calls:
break
n_iter += 1
if n_iter >= self.max_infer_iters:
logger.info(
f"Exiting inference loop since iteration count({n_iter}) exceeds {self.max_infer_iters=}"
)
final_status = "incomplete"
break
if function_tool_calls:
logger.info("Exiting inference loop since there is a function (client-side) tool call")
break
if last_completion_result and last_completion_result.finish_reason == "length":
final_status = "incomplete"
n_iter += 1
if n_iter >= self.max_infer_iters:
logger.info(f"Exiting inference loop since iteration count({n_iter}) exceeds {self.max_infer_iters=}")
break
except Exception as exc: # noqa: BLE001
self.final_messages = messages.copy()
self.sequence_number += 1
error = OpenAIResponseError(code="internal_error", message=str(exc))
failure_response = self._snapshot_response("failed", output_messages, error=error)
yield OpenAIResponseObjectStreamResponseFailed(
response=failure_response,
sequence_number=self.sequence_number,
)
return
self.final_messages = messages.copy()
# Create final response
final_response = OpenAIResponseObject(
created_at=self.created_at,
id=self.response_id,
model=self.ctx.model,
object="response",
status="completed",
text=self.text,
output=output_messages,
)
# Emit response.completed
yield OpenAIResponseObjectStreamResponseCompleted(response=final_response)
if final_status == "incomplete":
self.sequence_number += 1
final_response = self._snapshot_response("incomplete", output_messages)
yield OpenAIResponseObjectStreamResponseIncomplete(
response=final_response,
sequence_number=self.sequence_number,
)
else:
final_response = self._snapshot_response("completed", output_messages)
yield OpenAIResponseObjectStreamResponseCompleted(response=final_response)
def _separate_tool_calls(self, current_response, messages) -> tuple[list, list, list, list]:
"""Separate tool calls into function and non-function categories."""
@ -261,6 +317,8 @@ class StreamingResponseOrchestrator:
tool_call_item_ids: dict[int, str] = {}
# Track content parts for streaming events
content_part_emitted = False
content_index = 0
message_output_index = len(output_messages)
async for chunk in completion_result:
chat_response_id = chunk.id
@ -274,8 +332,10 @@ class StreamingResponseOrchestrator:
content_part_emitted = True
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartOutputText(
text="", # Will be filled incrementally via text deltas
),
@ -283,10 +343,10 @@ class StreamingResponseOrchestrator:
)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=0,
content_index=content_index,
delta=chunk_choice.delta.content,
item_id=message_item_id,
output_index=0,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
@ -386,8 +446,10 @@ class StreamingResponseOrchestrator:
final_text = "".join(chat_response_content)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartOutputText(
text=final_text,
),

View file

@ -48,7 +48,10 @@ from llama_stack.apis.inference import (
async def convert_chat_choice_to_response_message(
choice: OpenAIChoice, citation_files: dict[str, str] | None = None
choice: OpenAIChoice,
citation_files: dict[str, str] | None = None,
*,
message_id: str | None = None,
) -> OpenAIResponseMessage:
"""Convert an OpenAI Chat Completion choice into an OpenAI Response output message."""
output_content = ""
@ -64,7 +67,7 @@ async def convert_chat_choice_to_response_message(
annotations, clean_text = _extract_citations_from_text(output_content, citation_files or {})
return OpenAIResponseMessage(
id=f"msg_{uuid.uuid4()}",
id=message_id or f"msg_{uuid.uuid4()}",
content=[OpenAIResponseOutputMessageContentOutputText(text=clean_text, annotations=annotations)],
status="completed",
role="assistant",