feat(responses)!: add reasoning and annotation added events (#3793)

Implements missing streaming events from OpenAI Responses API spec: 
 - reasoning text/summary events for o1/o3 models, 
 - refusal events for safety moderation
 - annotation events for citations, 
 - and file search streaming events. 
 
Added optional reasoning_content field to chat completion chunks to
support non-standard provider extensions.

**NOTE:** OpenAI does _not_ fill reasoning_content when users use the
chat_completion APIs. This means there is no way for us to implement
Responses (with reasoning) by using OpenAI chat completions! We'd need
to transparently punt to OpenAI's responses endpoints if we wish to do
that. For others though (vLLM, etc.) we can use it.

## Test Plan

File search streaming test passes:
```
./scripts/integration-tests.sh --stack-config server:ci-tests \
   --suite responses --setup gpt --inference-mode replay --pattern test_response_file_search_streaming_events
```

Need more complex setup and validation for reasoning tests (need a vLLM
powered OSS model maybe gpt-oss which can return reasoning_content). I
will do that in a followup PR.
This commit is contained in:
Ashwin Bharambe 2025-10-11 16:47:14 -07:00 committed by GitHub
parent f365961731
commit 7c63aebd64
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 23530 additions and 2 deletions

View file

@ -953,6 +953,248 @@ class OpenAIResponseObjectStreamResponseContentPartDone(BaseModel):
type: Literal["response.content_part.done"] = "response.content_part.done"
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningTextDelta(BaseModel):
"""Streaming event for incremental reasoning text updates.
:param content_index: Index position of the reasoning content part
:param delta: Incremental reasoning text being added
:param item_id: Unique identifier of the output item being updated
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.reasoning_text.delta"
"""
content_index: int
delta: str
item_id: str
output_index: int
sequence_number: int
type: Literal["response.reasoning_text.delta"] = "response.reasoning_text.delta"
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningTextDone(BaseModel):
"""Streaming event for when reasoning text is completed.
:param content_index: Index position of the reasoning content part
:param text: Final complete reasoning text
:param item_id: Unique identifier of the completed output item
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.reasoning_text.done"
"""
content_index: int
text: str
item_id: str
output_index: int
sequence_number: int
type: Literal["response.reasoning_text.done"] = "response.reasoning_text.done"
@json_schema_type
class OpenAIResponseContentPartReasoningSummary(BaseModel):
"""Reasoning summary part in a streamed response.
:param type: Content part type identifier, always "summary_text"
:param text: Summary text
"""
type: Literal["summary_text"] = "summary_text"
text: str
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningSummaryPartAdded(BaseModel):
"""Streaming event for when a new reasoning summary part is added.
:param item_id: Unique identifier of the output item
:param output_index: Index position of the output item
:param part: The summary part that was added
:param sequence_number: Sequential number for ordering streaming events
:param summary_index: Index of the summary part within the reasoning summary
:param type: Event type identifier, always "response.reasoning_summary_part.added"
"""
item_id: str
output_index: int
part: OpenAIResponseContentPartReasoningSummary
sequence_number: int
summary_index: int
type: Literal["response.reasoning_summary_part.added"] = "response.reasoning_summary_part.added"
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningSummaryPartDone(BaseModel):
"""Streaming event for when a reasoning summary part is completed.
:param item_id: Unique identifier of the output item
:param output_index: Index position of the output item
:param part: The completed summary part
:param sequence_number: Sequential number for ordering streaming events
:param summary_index: Index of the summary part within the reasoning summary
:param type: Event type identifier, always "response.reasoning_summary_part.done"
"""
item_id: str
output_index: int
part: OpenAIResponseContentPartReasoningSummary
sequence_number: int
summary_index: int
type: Literal["response.reasoning_summary_part.done"] = "response.reasoning_summary_part.done"
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningSummaryTextDelta(BaseModel):
"""Streaming event for incremental reasoning summary text updates.
:param delta: Incremental summary text being added
:param item_id: Unique identifier of the output item
:param output_index: Index position of the output item
:param sequence_number: Sequential number for ordering streaming events
:param summary_index: Index of the summary part within the reasoning summary
:param type: Event type identifier, always "response.reasoning_summary_text.delta"
"""
delta: str
item_id: str
output_index: int
sequence_number: int
summary_index: int
type: Literal["response.reasoning_summary_text.delta"] = "response.reasoning_summary_text.delta"
@json_schema_type
class OpenAIResponseObjectStreamResponseReasoningSummaryTextDone(BaseModel):
"""Streaming event for when reasoning summary text is completed.
:param text: Final complete summary text
:param item_id: Unique identifier of the output item
:param output_index: Index position of the output item
:param sequence_number: Sequential number for ordering streaming events
:param summary_index: Index of the summary part within the reasoning summary
:param type: Event type identifier, always "response.reasoning_summary_text.done"
"""
text: str
item_id: str
output_index: int
sequence_number: int
summary_index: int
type: Literal["response.reasoning_summary_text.done"] = "response.reasoning_summary_text.done"
@json_schema_type
class OpenAIResponseObjectStreamResponseRefusalDelta(BaseModel):
"""Streaming event for incremental refusal text updates.
:param content_index: Index position of the content part
:param delta: Incremental refusal text being added
:param item_id: Unique identifier of the output item
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.refusal.delta"
"""
content_index: int
delta: str
item_id: str
output_index: int
sequence_number: int
type: Literal["response.refusal.delta"] = "response.refusal.delta"
@json_schema_type
class OpenAIResponseObjectStreamResponseRefusalDone(BaseModel):
"""Streaming event for when refusal text is completed.
:param content_index: Index position of the content part
:param refusal: Final complete refusal text
:param item_id: Unique identifier of the output item
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.refusal.done"
"""
content_index: int
refusal: str
item_id: str
output_index: int
sequence_number: int
type: Literal["response.refusal.done"] = "response.refusal.done"
@json_schema_type
class OpenAIResponseObjectStreamResponseOutputTextAnnotationAdded(BaseModel):
"""Streaming event for when an annotation is added to output text.
:param item_id: Unique identifier of the item to which the annotation is being added
:param output_index: Index position of the output item in the response's output array
:param content_index: Index position of the content part within the output item
:param annotation_index: Index of the annotation within the content part
:param annotation: The annotation object being added
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.output_text.annotation.added"
"""
item_id: str
output_index: int
content_index: int
annotation_index: int
annotation: OpenAIResponseAnnotations
sequence_number: int
type: Literal["response.output_text.annotation.added"] = "response.output_text.annotation.added"
@json_schema_type
class OpenAIResponseObjectStreamResponseFileSearchCallInProgress(BaseModel):
"""Streaming event for file search calls in progress.
:param item_id: Unique identifier of the file search call
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.file_search_call.in_progress"
"""
item_id: str
output_index: int
sequence_number: int
type: Literal["response.file_search_call.in_progress"] = "response.file_search_call.in_progress"
@json_schema_type
class OpenAIResponseObjectStreamResponseFileSearchCallSearching(BaseModel):
"""Streaming event for file search currently searching.
:param item_id: Unique identifier of the file search call
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.file_search_call.searching"
"""
item_id: str
output_index: int
sequence_number: int
type: Literal["response.file_search_call.searching"] = "response.file_search_call.searching"
@json_schema_type
class OpenAIResponseObjectStreamResponseFileSearchCallCompleted(BaseModel):
"""Streaming event for completed file search calls.
:param item_id: Unique identifier of the completed file search call
:param output_index: Index position of the item in the output list
:param sequence_number: Sequential number for ordering streaming events
:param type: Event type identifier, always "response.file_search_call.completed"
"""
item_id: str
output_index: int
sequence_number: int
type: Literal["response.file_search_call.completed"] = "response.file_search_call.completed"
OpenAIResponseObjectStream = Annotated[
OpenAIResponseObjectStreamResponseCreated
| OpenAIResponseObjectStreamResponseInProgress
@ -975,6 +1217,18 @@ OpenAIResponseObjectStream = Annotated[
| OpenAIResponseObjectStreamResponseMcpCallCompleted
| OpenAIResponseObjectStreamResponseContentPartAdded
| OpenAIResponseObjectStreamResponseContentPartDone
| OpenAIResponseObjectStreamResponseReasoningTextDelta
| OpenAIResponseObjectStreamResponseReasoningTextDone
| OpenAIResponseObjectStreamResponseReasoningSummaryPartAdded
| OpenAIResponseObjectStreamResponseReasoningSummaryPartDone
| OpenAIResponseObjectStreamResponseReasoningSummaryTextDelta
| OpenAIResponseObjectStreamResponseReasoningSummaryTextDone
| OpenAIResponseObjectStreamResponseRefusalDelta
| OpenAIResponseObjectStreamResponseRefusalDone
| OpenAIResponseObjectStreamResponseOutputTextAnnotationAdded
| OpenAIResponseObjectStreamResponseFileSearchCallInProgress
| OpenAIResponseObjectStreamResponseFileSearchCallSearching
| OpenAIResponseObjectStreamResponseFileSearchCallCompleted
| OpenAIResponseObjectStreamResponseIncomplete
| OpenAIResponseObjectStreamResponseFailed
| OpenAIResponseObjectStreamResponseCompleted,

View file

@ -777,12 +777,14 @@ class OpenAIChoiceDelta(BaseModel):
:param refusal: (Optional) The refusal of the delta
:param role: (Optional) The role of the delta
:param tool_calls: (Optional) The tool calls of the delta
:param reasoning_content: (Optional) The reasoning content from the model (non-standard, for o1/o3 models)
"""
content: str | None = None
refusal: str | None = None
role: str | None = None
tool_calls: list[OpenAIChatCompletionToolCall] | None = None
reasoning_content: str | None = None
@json_schema_type

View file

@ -13,6 +13,8 @@ from llama_stack.apis.agents.openai_responses import (
ApprovalFilter,
MCPListToolsTool,
OpenAIResponseContentPartOutputText,
OpenAIResponseContentPartReasoningText,
OpenAIResponseContentPartRefusal,
OpenAIResponseError,
OpenAIResponseInputTool,
OpenAIResponseInputToolMCP,
@ -35,6 +37,10 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectStreamResponseOutputItemAdded,
OpenAIResponseObjectStreamResponseOutputItemDone,
OpenAIResponseObjectStreamResponseOutputTextDelta,
OpenAIResponseObjectStreamResponseReasoningTextDelta,
OpenAIResponseObjectStreamResponseReasoningTextDone,
OpenAIResponseObjectStreamResponseRefusalDelta,
OpenAIResponseObjectStreamResponseRefusalDone,
OpenAIResponseOutput,
OpenAIResponseOutputMessageFunctionToolCall,
OpenAIResponseOutputMessageMCPListTools,
@ -355,6 +361,128 @@ class StreamingResponseOrchestrator:
),
)
async def _handle_reasoning_content_chunk(
self,
reasoning_content: str,
reasoning_part_emitted: bool,
reasoning_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Emit content_part.added event for first reasoning chunk
if not reasoning_part_emitted:
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=reasoning_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartReasoningText(
text="", # Will be filled incrementally via reasoning deltas
),
sequence_number=self.sequence_number,
)
# Emit reasoning_text.delta event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseReasoningTextDelta(
content_index=reasoning_content_index,
delta=reasoning_content,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
async def _handle_refusal_content_chunk(
self,
refusal_content: str,
refusal_part_emitted: bool,
refusal_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Emit content_part.added event for first refusal chunk
if not refusal_part_emitted:
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=refusal_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartRefusal(
refusal="", # Will be filled incrementally via refusal deltas
),
sequence_number=self.sequence_number,
)
# Emit refusal.delta event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseRefusalDelta(
content_index=refusal_content_index,
delta=refusal_content,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
async def _emit_reasoning_done_events(
self,
reasoning_text_accumulated: list[str],
reasoning_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
final_reasoning_text = "".join(reasoning_text_accumulated)
# Emit reasoning_text.done event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseReasoningTextDone(
content_index=reasoning_content_index,
text=final_reasoning_text,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
# Emit content_part.done for reasoning
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=reasoning_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartReasoningText(
text=final_reasoning_text,
),
sequence_number=self.sequence_number,
)
async def _emit_refusal_done_events(
self,
refusal_text_accumulated: list[str],
refusal_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
final_refusal_text = "".join(refusal_text_accumulated)
# Emit refusal.done event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseRefusalDone(
content_index=refusal_content_index,
refusal=final_refusal_text,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
# Emit content_part.done for refusal
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=refusal_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartRefusal(
refusal=final_refusal_text,
),
sequence_number=self.sequence_number,
)
async def _process_streaming_chunks(
self, completion_result, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream | ChatCompletionResult]:
@ -373,8 +501,14 @@ class StreamingResponseOrchestrator:
tool_call_item_ids: dict[int, str] = {}
# Track content parts for streaming events
content_part_emitted = False
reasoning_part_emitted = False
refusal_part_emitted = False
content_index = 0
reasoning_content_index = 1 # reasoning is a separate content part
refusal_content_index = 2 # refusal is a separate content part
message_output_index = len(output_messages)
reasoning_text_accumulated = []
refusal_text_accumulated = []
async for chunk in completion_result:
chat_response_id = chunk.id
@ -415,6 +549,32 @@ class StreamingResponseOrchestrator:
if chunk_choice.finish_reason:
chunk_finish_reason = chunk_choice.finish_reason
# Handle reasoning content if present (non-standard field for o1/o3 models)
if hasattr(chunk_choice.delta, "reasoning_content") and chunk_choice.delta.reasoning_content:
async for event in self._handle_reasoning_content_chunk(
reasoning_content=chunk_choice.delta.reasoning_content,
reasoning_part_emitted=reasoning_part_emitted,
reasoning_content_index=reasoning_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
reasoning_part_emitted = True
reasoning_text_accumulated.append(chunk_choice.delta.reasoning_content)
# Handle refusal content if present
if chunk_choice.delta.refusal:
async for event in self._handle_refusal_content_chunk(
refusal_content=chunk_choice.delta.refusal,
refusal_part_emitted=refusal_part_emitted,
refusal_content_index=refusal_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
refusal_part_emitted = True
refusal_text_accumulated.append(chunk_choice.delta.refusal)
# Aggregate tool call arguments across chunks
if chunk_choice.delta.tool_calls:
for tool_call in chunk_choice.delta.tool_calls:
@ -516,6 +676,26 @@ class StreamingResponseOrchestrator:
sequence_number=self.sequence_number,
)
# Emit reasoning done events if reasoning content was streamed
if reasoning_part_emitted:
async for event in self._emit_reasoning_done_events(
reasoning_text_accumulated=reasoning_text_accumulated,
reasoning_content_index=reasoning_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
# Emit refusal done events if refusal content was streamed
if refusal_part_emitted:
async for event in self._emit_refusal_done_events(
refusal_text_accumulated=refusal_text_accumulated,
refusal_content_index=refusal_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
# Clear content when there are tool calls (OpenAI spec behavior)
if chat_response_tool_calls:
chat_response_content = []

View file

@ -11,6 +11,9 @@ from collections.abc import AsyncIterator
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseInputToolFileSearch,
OpenAIResponseInputToolMCP,
OpenAIResponseObjectStreamResponseFileSearchCallCompleted,
OpenAIResponseObjectStreamResponseFileSearchCallInProgress,
OpenAIResponseObjectStreamResponseFileSearchCallSearching,
OpenAIResponseObjectStreamResponseMcpCallCompleted,
OpenAIResponseObjectStreamResponseMcpCallFailed,
OpenAIResponseObjectStreamResponseMcpCallInProgress,
@ -221,7 +224,13 @@ class ToolExecutor:
output_index=output_index,
sequence_number=sequence_number,
)
# Note: knowledge_search and other custom tools don't have specific streaming events in OpenAI spec
elif function_name == "knowledge_search":
sequence_number += 1
progress_event = OpenAIResponseObjectStreamResponseFileSearchCallInProgress(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
if progress_event:
yield ToolExecutionResult(stream_event=progress_event, sequence_number=sequence_number)
@ -236,6 +245,16 @@ class ToolExecutor:
)
yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number)
# For file search, emit searching event
if function_name == "knowledge_search":
sequence_number += 1
searching_event = OpenAIResponseObjectStreamResponseFileSearchCallSearching(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number)
async def _execute_tool(
self,
function_name: str,
@ -322,7 +341,13 @@ class ToolExecutor:
output_index=output_index,
sequence_number=sequence_number,
)
# Note: knowledge_search and other custom tools don't have specific completion events in OpenAI spec
elif function_name == "knowledge_search":
sequence_number += 1
completion_event = OpenAIResponseObjectStreamResponseFileSearchCallCompleted(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
if completion_event:
yield ToolExecutionResult(stream_event=completion_event, sequence_number=sequence_number)