feat(responses)!: improve responses + conversations implementations (#3810)

This PR updates the Conversation item related types and improves a
couple critical parts of the implemenation:

- it creates a streaming output item for the final assistant message
output by
  the model. until now we only added content parts and included that
  message in the final response.

- rewrites the conversation update code completely to account for items
  other than messages (tool calls, outputs, etc.)

## Test Plan

Used the test script from
https://github.com/llamastack/llama-stack-client-python/pull/281 for
this

```
TEST_API_BASE_URL=http://localhost:8321/v1 \
  pytest tests/integration/test_agent_turn_step_events.py::test_client_side_function_tool -xvs
```
This commit is contained in:
Ashwin Bharambe 2025-10-15 09:36:11 -07:00 committed by GitHub
parent add8cd801b
commit e9b4278a51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
129 changed files with 86266 additions and 903 deletions

View file

@ -1258,9 +1258,9 @@ OpenAIResponseInput = Annotated[
| OpenAIResponseInputFunctionToolCallOutput
| OpenAIResponseMCPApprovalRequest
| OpenAIResponseMCPApprovalResponse
|
# Fallback to the generic message type as a last resort
OpenAIResponseMessage,
| OpenAIResponseOutputMessageMCPCall
| OpenAIResponseOutputMessageMCPListTools
| OpenAIResponseMessage,
Field(union_mode="left_to_right"),
]
register_schema(OpenAIResponseInput, name="OpenAIResponseInput")

View file

@ -12,6 +12,9 @@ from openai.types.responses.response_includable import ResponseIncludable
from pydantic import BaseModel, Field
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseInputFunctionToolCallOutput,
OpenAIResponseMCPApprovalRequest,
OpenAIResponseMCPApprovalResponse,
OpenAIResponseMessage,
OpenAIResponseOutputMessageFileSearchToolCall,
OpenAIResponseOutputMessageFunctionToolCall,
@ -61,9 +64,14 @@ class ConversationMessage(BaseModel):
ConversationItem = Annotated[
OpenAIResponseMessage
| OpenAIResponseOutputMessageFunctionToolCall
| OpenAIResponseOutputMessageFileSearchToolCall
| OpenAIResponseOutputMessageWebSearchToolCall
| OpenAIResponseOutputMessageFileSearchToolCall
| OpenAIResponseOutputMessageFunctionToolCall
| OpenAIResponseInputFunctionToolCallOutput
| OpenAIResponseMCPApprovalRequest
| OpenAIResponseMCPApprovalResponse
| OpenAIResponseOutputMessageMCPCall
| OpenAIResponseOutputMessageMCPListTools
| OpenAIResponseOutputMessageMCPCall
| OpenAIResponseOutputMessageMCPListTools,
Field(discriminator="type"),

View file

@ -135,7 +135,7 @@ class ConversationServiceImpl(Conversations):
object="conversation",
)
logger.info(f"Created conversation {conversation_id}")
logger.debug(f"Created conversation {conversation_id}")
return conversation
async def get_conversation(self, conversation_id: str) -> Conversation:
@ -161,7 +161,7 @@ class ConversationServiceImpl(Conversations):
"""Delete a conversation with the given ID."""
await self.sql_store.delete(table="openai_conversations", where={"id": conversation_id})
logger.info(f"Deleted conversation {conversation_id}")
logger.debug(f"Deleted conversation {conversation_id}")
return ConversationDeletedResource(id=conversation_id)
def _validate_conversation_id(self, conversation_id: str) -> None:
@ -222,7 +222,7 @@ class ConversationServiceImpl(Conversations):
created_items.append(item_dict)
logger.info(f"Created {len(created_items)} items in conversation {conversation_id}")
logger.debug(f"Created {len(created_items)} items in conversation {conversation_id}")
# Convert created items (dicts) to proper ConversationItem types
adapter: TypeAdapter[ConversationItem] = TypeAdapter(ConversationItem)
@ -255,6 +255,12 @@ class ConversationServiceImpl(Conversations):
async def list(self, conversation_id: str, after=NOT_GIVEN, include=NOT_GIVEN, limit=NOT_GIVEN, order=NOT_GIVEN):
"""List items in the conversation."""
if not conversation_id:
raise ValueError(f"Expected a non-empty value for `conversation_id` but received {conversation_id!r}")
# check if conversation exists
await self.get_conversation(conversation_id)
result = await self.sql_store.fetch_all(table="conversation_items", where={"conversation_id": conversation_id})
records = result.data
@ -305,5 +311,5 @@ class ConversationServiceImpl(Conversations):
table="conversation_items", where={"id": item_id, "conversation_id": conversation_id}
)
logger.info(f"Deleted item {item_id} from conversation {conversation_id}")
logger.debug(f"Deleted item {item_id} from conversation {conversation_id}")
return ConversationItemDeletedResource(id=item_id)

View file

@ -100,6 +100,7 @@ class OpenAIResponsesImpl:
input: str | list[OpenAIResponseInput],
tools: list[OpenAIResponseInputTool] | None,
previous_response_id: str | None,
conversation: str | None,
) -> tuple[str | list[OpenAIResponseInput], list[OpenAIMessageParam]]:
"""Process input with optional previous response context.
@ -124,16 +125,27 @@ class OpenAIResponsesImpl:
messages = await convert_response_input_to_chat_messages(all_input)
tool_context.recover_tools_from_previous_response(previous_response)
elif conversation is not None:
conversation_items = await self.conversations_api.list(conversation, order="asc")
# Use stored messages as source of truth (like previous_response.messages)
stored_messages = await self.responses_store.get_conversation_messages(conversation)
all_input = input
if not conversation_items.data:
# First turn - just convert the new input
messages = await convert_response_input_to_chat_messages(input)
else:
# Use stored messages directly and convert only new input
messages = stored_messages or []
new_messages = await convert_response_input_to_chat_messages(input, previous_messages=messages)
messages.extend(new_messages)
else:
all_input = input
messages = await convert_response_input_to_chat_messages(input)
messages = await convert_response_input_to_chat_messages(all_input)
return all_input, messages, tool_context
async def _prepend_instructions(self, messages, instructions):
if instructions:
messages.insert(0, OpenAISystemMessageParam(content=instructions))
async def get_openai_response(
self,
response_id: str,
@ -229,27 +241,21 @@ class OpenAIResponsesImpl:
if shields is not None:
raise NotImplementedError("Shields parameter is not yet implemented in the meta-reference provider")
if conversation is not None and previous_response_id is not None:
raise ValueError(
"Mutually exclusive parameters: 'previous_response_id' and 'conversation'. Ensure you are only providing one of these parameters."
)
original_input = input # needed for syncing to Conversations
if conversation is not None:
if previous_response_id is not None:
raise ValueError(
"Mutually exclusive parameters: 'previous_response_id' and 'conversation'. Ensure you are only providing one of these parameters."
)
if not conversation.startswith("conv_"):
raise InvalidConversationIdError(conversation)
# Check conversation exists (raises ConversationNotFoundError if not)
_ = await self.conversations_api.get_conversation(conversation)
input = await self._load_conversation_context(conversation, input)
stream_gen = self._create_streaming_response(
input=input,
original_input=original_input,
conversation=conversation,
model=model,
instructions=instructions,
previous_response_id=previous_response_id,
conversation=conversation,
store=store,
temperature=temperature,
text=text,
@ -292,7 +298,6 @@ class OpenAIResponsesImpl:
self,
input: str | list[OpenAIResponseInput],
model: str,
original_input: str | list[OpenAIResponseInput] | None = None,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
@ -304,9 +309,11 @@ class OpenAIResponsesImpl:
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Input preprocessing
all_input, messages, tool_context = await self._process_input_with_previous_response(
input, tools, previous_response_id
input, tools, previous_response_id, conversation
)
await self._prepend_instructions(messages, instructions)
if instructions:
messages.insert(0, OpenAISystemMessageParam(content=instructions))
# Structured outputs
response_format = await convert_response_text_to_chat_response_format(text)
@ -338,6 +345,8 @@ class OpenAIResponsesImpl:
# Stream the response
final_response = None
failed_response = None
output_items = []
async for stream_chunk in orchestrator.create_response():
if stream_chunk.type in {"response.completed", "response.incomplete"}:
final_response = stream_chunk.response
@ -345,102 +354,50 @@ class OpenAIResponsesImpl:
failed_response = stream_chunk.response
yield stream_chunk
if stream_chunk.type == "response.output_item.done":
item = stream_chunk.item
output_items.append(item)
# Store and sync immediately after yielding terminal events
# This ensures the storage/syncing happens even if the consumer breaks early
if (
stream_chunk.type in {"response.completed", "response.incomplete"}
and store
and final_response
and failed_response is None
):
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
messages_to_store = list(
filter(lambda x: not isinstance(x, OpenAISystemMessageParam), orchestrator.final_messages)
)
if store:
# TODO: we really should work off of output_items instead of "final_messages"
await self._store_response(
response=final_response,
input=all_input,
messages=messages_to_store,
)
if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response:
# for Conversations, we need to use the original_input if it's available, otherwise use input
sync_input = original_input if original_input is not None else input
await self._sync_response_to_conversation(conversation, sync_input, final_response)
if conversation:
await self._sync_response_to_conversation(conversation, input, output_items)
await self.responses_store.store_conversation_messages(conversation, messages_to_store)
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id)
async def _load_conversation_context(
self, conversation_id: str, content: str | list[OpenAIResponseInput]
) -> list[OpenAIResponseInput]:
"""Load conversation history and merge with provided content."""
conversation_items = await self.conversations_api.list(conversation_id, order="asc")
context_messages = []
for item in conversation_items.data:
if isinstance(item, OpenAIResponseMessage):
if item.role == "user":
context_messages.append(
OpenAIResponseMessage(
role="user", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
elif item.role == "assistant":
context_messages.append(
OpenAIResponseMessage(
role="assistant", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
# add new content to context
if isinstance(content, str):
context_messages.append(OpenAIResponseMessage(role="user", content=content))
elif isinstance(content, list):
context_messages.extend(content)
return context_messages
async def _sync_response_to_conversation(
self, conversation_id: str, content: str | list[OpenAIResponseInput], response: OpenAIResponseObject
self, conversation_id: str, input: str | list[OpenAIResponseInput] | None, output_items: list[ConversationItem]
) -> None:
"""Sync content and response messages to the conversation."""
conversation_items = []
# add user content message(s)
if isinstance(content, str):
if isinstance(input, str):
conversation_items.append(
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": content}]}
OpenAIResponseMessage(role="user", content=[OpenAIResponseInputMessageContentText(text=input)])
)
elif isinstance(content, list):
for item in content:
if not isinstance(item, OpenAIResponseMessage):
raise NotImplementedError(f"Unsupported input item type: {type(item)}")
elif isinstance(input, list):
conversation_items.extend(input)
if item.role == "user":
if isinstance(item.content, str):
conversation_items.append(
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": item.content}],
}
)
elif isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "user", "content": item.content})
else:
raise NotImplementedError(f"Unsupported user message content type: {type(item.content)}")
elif item.role == "assistant":
if isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": item.content})
else:
raise NotImplementedError(f"Unsupported assistant message content type: {type(item.content)}")
else:
raise NotImplementedError(f"Unsupported message role: {item.role}")
conversation_items.extend(output_items)
# add assistant response message
for output_item in response.output:
if isinstance(output_item, OpenAIResponseMessage) and output_item.role == "assistant":
if hasattr(output_item, "content") and isinstance(output_item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": output_item.content})
if conversation_items:
adapter = TypeAdapter(list[ConversationItem])
validated_items = adapter.validate_python(conversation_items)
await self.conversations_api.add_items(conversation_id, validated_items)
adapter = TypeAdapter(list[ConversationItem])
validated_items = adapter.validate_python(conversation_items)
await self.conversations_api.add_items(conversation_id, validated_items)

View file

@ -19,6 +19,7 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseInputTool,
OpenAIResponseInputToolMCP,
OpenAIResponseMCPApprovalRequest,
OpenAIResponseMessage,
OpenAIResponseObject,
OpenAIResponseObjectStream,
OpenAIResponseObjectStreamResponseCompleted,
@ -42,6 +43,7 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectStreamResponseRefusalDelta,
OpenAIResponseObjectStreamResponseRefusalDone,
OpenAIResponseOutput,
OpenAIResponseOutputMessageContentOutputText,
OpenAIResponseOutputMessageFunctionToolCall,
OpenAIResponseOutputMessageMCPListTools,
OpenAIResponseText,
@ -500,6 +502,7 @@ class StreamingResponseOrchestrator:
# Track tool call items for streaming events
tool_call_item_ids: dict[int, str] = {}
# Track content parts for streaming events
message_item_added_emitted = False
content_part_emitted = False
reasoning_part_emitted = False
refusal_part_emitted = False
@ -521,6 +524,23 @@ class StreamingResponseOrchestrator:
for chunk_choice in chunk.choices:
# Emit incremental text content as delta events
if chunk_choice.delta.content:
# Emit output_item.added for the message on first content
if not message_item_added_emitted:
message_item_added_emitted = True
self.sequence_number += 1
message_item = OpenAIResponseMessage(
id=message_item_id,
content=[],
role="assistant",
status="in_progress",
)
yield OpenAIResponseObjectStreamResponseOutputItemAdded(
response_id=self.response_id,
item=message_item,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
# Emit content_part.added event for first text chunk
if not content_part_emitted:
content_part_emitted = True
@ -700,6 +720,32 @@ class StreamingResponseOrchestrator:
if chat_response_tool_calls:
chat_response_content = []
# Emit output_item.done for message when we have content and no tool calls
if message_item_added_emitted and not chat_response_tool_calls:
content_parts = []
if content_part_emitted:
final_text = "".join(chat_response_content)
content_parts.append(
OpenAIResponseOutputMessageContentOutputText(
text=final_text,
annotations=[],
)
)
self.sequence_number += 1
message_item = OpenAIResponseMessage(
id=message_item_id,
content=content_parts,
role="assistant",
status="completed",
)
yield OpenAIResponseObjectStreamResponseOutputItemDone(
response_id=self.response_id,
item=message_item,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
yield ChatCompletionResult(
response_id=chat_response_id,
content=chat_response_content,

View file

@ -88,12 +88,20 @@ class ResponsesStore:
},
)
await self.sql_store.create_table(
"conversation_messages",
{
"conversation_id": ColumnDefinition(type=ColumnType.STRING, primary_key=True),
"messages": ColumnType.JSON,
},
)
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else:
logger.info("Write queue disabled for SQLite to avoid concurrency issues")
logger.debug("Write queue disabled for SQLite to avoid concurrency issues")
async def shutdown(self) -> None:
if not self._worker_tasks:
@ -294,3 +302,54 @@ class ResponsesStore:
items = items[:limit]
return ListOpenAIResponseInputItem(data=items)
async def store_conversation_messages(self, conversation_id: str, messages: list[OpenAIMessageParam]) -> None:
"""Store messages for a conversation.
:param conversation_id: The conversation identifier.
:param messages: List of OpenAI message parameters to store.
"""
if not self.sql_store:
raise ValueError("Responses store is not initialized")
# Serialize messages to dict format for JSON storage
messages_data = [msg.model_dump() for msg in messages]
# Upsert: try insert first, update if exists
try:
await self.sql_store.insert(
table="conversation_messages",
data={"conversation_id": conversation_id, "messages": messages_data},
)
except Exception:
# If insert fails due to ID conflict, update existing record
await self.sql_store.update(
table="conversation_messages",
data={"messages": messages_data},
where={"conversation_id": conversation_id},
)
logger.debug(f"Stored {len(messages)} messages for conversation {conversation_id}")
async def get_conversation_messages(self, conversation_id: str) -> list[OpenAIMessageParam] | None:
"""Get stored messages for a conversation.
:param conversation_id: The conversation identifier.
:returns: List of OpenAI message parameters, or None if no messages stored.
"""
if not self.sql_store:
raise ValueError("Responses store is not initialized")
record = await self.sql_store.fetch_one(
table="conversation_messages",
where={"conversation_id": conversation_id},
)
if record is None:
return None
# Deserialize messages from JSON storage
from pydantic import TypeAdapter
adapter = TypeAdapter(list[OpenAIMessageParam])
return adapter.validate_python(record["messages"])