mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-16 06:27:58 +00:00
feat(responses): improve streaming for function calls (#3124)
Some checks failed
Integration Tests (Replay) / discover-tests (push) Successful in 3s
Vector IO Integration Tests / test-matrix (3.12, inline::sqlite-vec) (push) Failing after 8s
Test Llama Stack Build / build-single-provider (push) Failing after 5s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Vector IO Integration Tests / test-matrix (3.13, remote::qdrant) (push) Failing after 10s
Test Llama Stack Build / generate-matrix (push) Successful in 9s
Vector IO Integration Tests / test-matrix (3.13, inline::faiss) (push) Failing after 13s
Python Package Build Test / build (3.13) (push) Failing after 5s
Vector IO Integration Tests / test-matrix (3.13, remote::pgvector) (push) Failing after 11s
Test Llama Stack Build / build-custom-container-distribution (push) Failing after 8s
Test Llama Stack Build / build-ubi9-container-distribution (push) Failing after 7s
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 21s
Python Package Build Test / build (3.12) (push) Failing after 9s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.12, remote::chromadb) (push) Failing after 15s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 29s
Unit Tests / unit-tests (3.12) (push) Failing after 8s
Test External API and Providers / test-external (venv) (push) Failing after 13s
Update ReadTheDocs / update-readthedocs (push) Failing after 8s
Unit Tests / unit-tests (3.13) (push) Failing after 10s
Vector IO Integration Tests / test-matrix (3.13, remote::weaviate) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.13, remote::chromadb) (push) Failing after 16s
Vector IO Integration Tests / test-matrix (3.12, remote::weaviate) (push) Failing after 18s
Vector IO Integration Tests / test-matrix (3.12, remote::pgvector) (push) Failing after 25s
Vector IO Integration Tests / test-matrix (3.13, inline::sqlite-vec) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.12, remote::qdrant) (push) Failing after 24s
Vector IO Integration Tests / test-matrix (3.12, inline::faiss) (push) Failing after 25s
Vector IO Integration Tests / test-matrix (3.12, inline::milvus) (push) Failing after 26s
Vector IO Integration Tests / test-matrix (3.13, inline::milvus) (push) Failing after 22s
Integration Tests (Replay) / Integration Tests (, , , client=, vision=) (push) Failing after 17s
Pre-commit / pre-commit (push) Successful in 1m10s
Test Llama Stack Build / build (push) Failing after 12s
Some checks failed
Integration Tests (Replay) / discover-tests (push) Successful in 3s
Vector IO Integration Tests / test-matrix (3.12, inline::sqlite-vec) (push) Failing after 8s
Test Llama Stack Build / build-single-provider (push) Failing after 5s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Vector IO Integration Tests / test-matrix (3.13, remote::qdrant) (push) Failing after 10s
Test Llama Stack Build / generate-matrix (push) Successful in 9s
Vector IO Integration Tests / test-matrix (3.13, inline::faiss) (push) Failing after 13s
Python Package Build Test / build (3.13) (push) Failing after 5s
Vector IO Integration Tests / test-matrix (3.13, remote::pgvector) (push) Failing after 11s
Test Llama Stack Build / build-custom-container-distribution (push) Failing after 8s
Test Llama Stack Build / build-ubi9-container-distribution (push) Failing after 7s
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 21s
Python Package Build Test / build (3.12) (push) Failing after 9s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.12, remote::chromadb) (push) Failing after 15s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 29s
Unit Tests / unit-tests (3.12) (push) Failing after 8s
Test External API and Providers / test-external (venv) (push) Failing after 13s
Update ReadTheDocs / update-readthedocs (push) Failing after 8s
Unit Tests / unit-tests (3.13) (push) Failing after 10s
Vector IO Integration Tests / test-matrix (3.13, remote::weaviate) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.13, remote::chromadb) (push) Failing after 16s
Vector IO Integration Tests / test-matrix (3.12, remote::weaviate) (push) Failing after 18s
Vector IO Integration Tests / test-matrix (3.12, remote::pgvector) (push) Failing after 25s
Vector IO Integration Tests / test-matrix (3.13, inline::sqlite-vec) (push) Failing after 23s
Vector IO Integration Tests / test-matrix (3.12, remote::qdrant) (push) Failing after 24s
Vector IO Integration Tests / test-matrix (3.12, inline::faiss) (push) Failing after 25s
Vector IO Integration Tests / test-matrix (3.12, inline::milvus) (push) Failing after 26s
Vector IO Integration Tests / test-matrix (3.13, inline::milvus) (push) Failing after 22s
Integration Tests (Replay) / Integration Tests (, , , client=, vision=) (push) Failing after 17s
Pre-commit / pre-commit (push) Successful in 1m10s
Test Llama Stack Build / build (push) Failing after 12s
Emit streaming events for function calls ## Test Plan Improved the test case
This commit is contained in:
parent
d6ae54723d
commit
5b312a80b9
3 changed files with 250 additions and 33 deletions
|
@ -33,6 +33,10 @@ from llama_stack.apis.agents.openai_responses import (
|
|||
OpenAIResponseObjectStream,
|
||||
OpenAIResponseObjectStreamResponseCompleted,
|
||||
OpenAIResponseObjectStreamResponseCreated,
|
||||
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta,
|
||||
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone,
|
||||
OpenAIResponseObjectStreamResponseOutputItemAdded,
|
||||
OpenAIResponseObjectStreamResponseOutputItemDone,
|
||||
OpenAIResponseObjectStreamResponseOutputTextDelta,
|
||||
OpenAIResponseOutput,
|
||||
OpenAIResponseOutputMessageContent,
|
||||
|
@ -73,7 +77,9 @@ from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime
|
|||
from llama_stack.apis.vector_io import VectorIO
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.models.llama.datatypes import ToolDefinition, ToolParamDefinition
|
||||
from llama_stack.providers.utils.inference.openai_compat import convert_tooldef_to_openai_tool
|
||||
from llama_stack.providers.utils.inference.openai_compat import (
|
||||
convert_tooldef_to_openai_tool,
|
||||
)
|
||||
from llama_stack.providers.utils.responses.responses_store import ResponsesStore
|
||||
|
||||
logger = get_logger(name=__name__, category="openai_responses")
|
||||
|
@ -82,7 +88,7 @@ OPENAI_RESPONSES_PREFIX = "openai_responses:"
|
|||
|
||||
|
||||
async def _convert_response_content_to_chat_content(
|
||||
content: str | list[OpenAIResponseInputMessageContent] | list[OpenAIResponseOutputMessageContent],
|
||||
content: (str | list[OpenAIResponseInputMessageContent] | list[OpenAIResponseOutputMessageContent]),
|
||||
) -> str | list[OpenAIChatCompletionContentPartParam]:
|
||||
"""
|
||||
Convert the content parts from an OpenAI Response API request into OpenAI Chat Completion content parts.
|
||||
|
@ -150,7 +156,9 @@ async def _convert_response_input_to_chat_messages(
|
|||
return messages
|
||||
|
||||
|
||||
async def _convert_chat_choice_to_response_message(choice: OpenAIChoice) -> OpenAIResponseMessage:
|
||||
async def _convert_chat_choice_to_response_message(
|
||||
choice: OpenAIChoice,
|
||||
) -> OpenAIResponseMessage:
|
||||
"""
|
||||
Convert an OpenAI Chat Completion choice into an OpenAI Response output message.
|
||||
"""
|
||||
|
@ -172,7 +180,9 @@ async def _convert_chat_choice_to_response_message(choice: OpenAIChoice) -> Open
|
|||
)
|
||||
|
||||
|
||||
async def _convert_response_text_to_chat_response_format(text: OpenAIResponseText) -> OpenAIResponseFormatParam:
|
||||
async def _convert_response_text_to_chat_response_format(
|
||||
text: OpenAIResponseText,
|
||||
) -> OpenAIResponseFormatParam:
|
||||
"""
|
||||
Convert an OpenAI Response text parameter into an OpenAI Chat Completion response format.
|
||||
"""
|
||||
|
@ -228,7 +238,9 @@ class OpenAIResponsesImpl:
|
|||
self.vector_io_api = vector_io_api
|
||||
|
||||
async def _prepend_previous_response(
|
||||
self, input: str | list[OpenAIResponseInput], previous_response_id: str | None = None
|
||||
self,
|
||||
input: str | list[OpenAIResponseInput],
|
||||
previous_response_id: str | None = None,
|
||||
):
|
||||
if previous_response_id:
|
||||
previous_response_with_input = await self.responses_store.get_response_object(previous_response_id)
|
||||
|
@ -446,6 +458,8 @@ class OpenAIResponsesImpl:
|
|||
|
||||
# Create a placeholder message item for delta events
|
||||
message_item_id = f"msg_{uuid.uuid4()}"
|
||||
# Track tool call items for streaming events
|
||||
tool_call_item_ids: dict[int, str] = {}
|
||||
|
||||
async for chunk in completion_result:
|
||||
chat_response_id = chunk.id
|
||||
|
@ -472,18 +486,62 @@ class OpenAIResponsesImpl:
|
|||
if chunk_choice.delta.tool_calls:
|
||||
for tool_call in chunk_choice.delta.tool_calls:
|
||||
response_tool_call = chat_response_tool_calls.get(tool_call.index, None)
|
||||
if response_tool_call:
|
||||
# Don't attempt to concatenate arguments if we don't have any new argumentsAdd commentMore actions
|
||||
if tool_call.function.arguments:
|
||||
# Guard against an initial None argument before we concatenate
|
||||
response_tool_call.function.arguments = (
|
||||
response_tool_call.function.arguments or ""
|
||||
) + tool_call.function.arguments
|
||||
else:
|
||||
# Create new tool call entry if this is the first chunk for this index
|
||||
is_new_tool_call = response_tool_call is None
|
||||
if is_new_tool_call:
|
||||
tool_call_dict: dict[str, Any] = tool_call.model_dump()
|
||||
tool_call_dict.pop("type", None)
|
||||
response_tool_call = OpenAIChatCompletionToolCall(**tool_call_dict)
|
||||
chat_response_tool_calls[tool_call.index] = response_tool_call
|
||||
chat_response_tool_calls[tool_call.index] = response_tool_call
|
||||
|
||||
# Create item ID for this tool call for streaming events
|
||||
tool_call_item_id = f"fc_{uuid.uuid4()}"
|
||||
tool_call_item_ids[tool_call.index] = tool_call_item_id
|
||||
|
||||
# Emit output_item.added event for the new function call
|
||||
sequence_number += 1
|
||||
function_call_item = OpenAIResponseOutputMessageFunctionToolCall(
|
||||
arguments="", # Will be filled incrementally via delta events
|
||||
call_id=tool_call.id or "",
|
||||
name=tool_call.function.name if tool_call.function else "",
|
||||
id=tool_call_item_id,
|
||||
status="in_progress",
|
||||
)
|
||||
yield OpenAIResponseObjectStreamResponseOutputItemAdded(
|
||||
response_id=response_id,
|
||||
item=function_call_item,
|
||||
output_index=len(output_messages),
|
||||
sequence_number=sequence_number,
|
||||
)
|
||||
|
||||
# Stream function call arguments as they arrive
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
tool_call_item_id = tool_call_item_ids[tool_call.index]
|
||||
sequence_number += 1
|
||||
yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta(
|
||||
delta=tool_call.function.arguments,
|
||||
item_id=tool_call_item_id,
|
||||
output_index=len(output_messages),
|
||||
sequence_number=sequence_number,
|
||||
)
|
||||
|
||||
# Accumulate arguments for final response (only for subsequent chunks)
|
||||
if not is_new_tool_call:
|
||||
response_tool_call.function.arguments = (
|
||||
response_tool_call.function.arguments or ""
|
||||
) + tool_call.function.arguments
|
||||
|
||||
# Emit function_call_arguments.done events for completed tool calls
|
||||
for tool_call_index in sorted(chat_response_tool_calls.keys()):
|
||||
tool_call_item_id = tool_call_item_ids[tool_call_index]
|
||||
final_arguments = chat_response_tool_calls[tool_call_index].function.arguments or ""
|
||||
sequence_number += 1
|
||||
yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone(
|
||||
arguments=final_arguments,
|
||||
item_id=tool_call_item_id,
|
||||
output_index=len(output_messages),
|
||||
sequence_number=sequence_number,
|
||||
)
|
||||
|
||||
# Convert collected chunks to complete response
|
||||
if chat_response_tool_calls:
|
||||
|
@ -532,18 +590,56 @@ class OpenAIResponsesImpl:
|
|||
tool_call_log, tool_response_message = await self._execute_tool_call(tool_call, ctx)
|
||||
if tool_call_log:
|
||||
output_messages.append(tool_call_log)
|
||||
|
||||
# Emit output_item.done event for completed non-function tool call
|
||||
# Find the item_id for this tool call
|
||||
matching_item_id = None
|
||||
for index, item_id in tool_call_item_ids.items():
|
||||
response_tool_call = chat_response_tool_calls.get(index)
|
||||
if response_tool_call and response_tool_call.id == tool_call.id:
|
||||
matching_item_id = item_id
|
||||
break
|
||||
|
||||
if matching_item_id:
|
||||
sequence_number += 1
|
||||
yield OpenAIResponseObjectStreamResponseOutputItemDone(
|
||||
response_id=response_id,
|
||||
item=tool_call_log,
|
||||
output_index=len(output_messages) - 1,
|
||||
sequence_number=sequence_number,
|
||||
)
|
||||
|
||||
if tool_response_message:
|
||||
next_turn_messages.append(tool_response_message)
|
||||
|
||||
for tool_call in function_tool_calls:
|
||||
output_messages.append(
|
||||
OpenAIResponseOutputMessageFunctionToolCall(
|
||||
arguments=tool_call.function.arguments or "",
|
||||
call_id=tool_call.id,
|
||||
name=tool_call.function.name or "",
|
||||
id=f"fc_{uuid.uuid4()}",
|
||||
status="completed",
|
||||
)
|
||||
# Find the item_id for this tool call from our tracking dictionary
|
||||
matching_item_id = None
|
||||
for index, item_id in tool_call_item_ids.items():
|
||||
response_tool_call = chat_response_tool_calls.get(index)
|
||||
if response_tool_call and response_tool_call.id == tool_call.id:
|
||||
matching_item_id = item_id
|
||||
break
|
||||
|
||||
# Use existing item_id or create new one if not found
|
||||
final_item_id = matching_item_id or f"fc_{uuid.uuid4()}"
|
||||
|
||||
function_call_item = OpenAIResponseOutputMessageFunctionToolCall(
|
||||
arguments=tool_call.function.arguments or "",
|
||||
call_id=tool_call.id,
|
||||
name=tool_call.function.name or "",
|
||||
id=final_item_id,
|
||||
status="completed",
|
||||
)
|
||||
output_messages.append(function_call_item)
|
||||
|
||||
# Emit output_item.done event for completed function call
|
||||
sequence_number += 1
|
||||
yield OpenAIResponseObjectStreamResponseOutputItemDone(
|
||||
response_id=response_id,
|
||||
item=function_call_item,
|
||||
output_index=len(output_messages) - 1,
|
||||
sequence_number=sequence_number,
|
||||
)
|
||||
|
||||
if not function_tool_calls and not non_function_tool_calls:
|
||||
|
@ -779,7 +875,8 @@ class OpenAIResponsesImpl:
|
|||
)
|
||||
elif function.name == "knowledge_search":
|
||||
response_file_search_tool = next(
|
||||
(t for t in ctx.response_tools if isinstance(t, OpenAIResponseInputToolFileSearch)), None
|
||||
(t for t in ctx.response_tools if isinstance(t, OpenAIResponseInputToolFileSearch)),
|
||||
None,
|
||||
)
|
||||
if response_file_search_tool:
|
||||
# Use vector_stores.search API instead of knowledge_search tool
|
||||
|
@ -798,7 +895,9 @@ class OpenAIResponsesImpl:
|
|||
error_exc = e
|
||||
|
||||
if function.name in ctx.mcp_tool_to_server:
|
||||
from llama_stack.apis.agents.openai_responses import OpenAIResponseOutputMessageMCPCall
|
||||
from llama_stack.apis.agents.openai_responses import (
|
||||
OpenAIResponseOutputMessageMCPCall,
|
||||
)
|
||||
|
||||
message = OpenAIResponseOutputMessageMCPCall(
|
||||
id=tool_call_id,
|
||||
|
@ -850,7 +949,10 @@ class OpenAIResponsesImpl:
|
|||
if isinstance(result.content, str):
|
||||
content = result.content
|
||||
elif isinstance(result.content, list):
|
||||
from llama_stack.apis.common.content_types import ImageContentItem, TextContentItem
|
||||
from llama_stack.apis.common.content_types import (
|
||||
ImageContentItem,
|
||||
TextContentItem,
|
||||
)
|
||||
|
||||
content = []
|
||||
for item in result.content:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue