Merge remote-tracking branch 'origin/main' into stores
Some checks failed
Installer CI / smoke-test-on-dev (push) Failing after 3s
Installer CI / lint (push) Failing after 3s

This commit is contained in:
Ashwin Bharambe 2025-10-13 11:07:11 -07:00
commit b72154ce5e
1161 changed files with 609896 additions and 42960 deletions

View file

@ -21,7 +21,9 @@ async def get_provider_impl(config: MetaReferenceAgentsImplConfig, deps: dict[Ap
deps[Api.safety],
deps[Api.tool_runtime],
deps[Api.tool_groups],
deps[Api.conversations],
policy,
Api.telemetry in deps,
)
await impl.initialize()
return impl

View file

@ -7,8 +7,6 @@
import copy
import json
import re
import secrets
import string
import uuid
import warnings
from collections.abc import AsyncGenerator
@ -51,6 +49,7 @@ from llama_stack.apis.inference import (
Inference,
Message,
OpenAIAssistantMessageParam,
OpenAIChatCompletionRequestWithExtraBody,
OpenAIDeveloperMessageParam,
OpenAIMessageParam,
OpenAISystemMessageParam,
@ -84,11 +83,6 @@ from llama_stack.providers.utils.telemetry import tracing
from .persistence import AgentPersistence
from .safety import SafetyException, ShieldRunnerMixin
def make_random_string(length: int = 8):
return "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length))
TOOLS_ATTACHMENT_KEY_REGEX = re.compile(r"__tools_attachment__=(\{.*?\})")
MEMORY_QUERY_TOOL = "knowledge_search"
WEB_SEARCH_TOOL = "web_search"
@ -110,6 +104,7 @@ class ChatAgent(ShieldRunnerMixin):
persistence_store: KVStore,
created_at: str,
policy: list[AccessRule],
telemetry_enabled: bool = False,
):
self.agent_id = agent_id
self.agent_config = agent_config
@ -120,6 +115,7 @@ class ChatAgent(ShieldRunnerMixin):
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.created_at = created_at
self.telemetry_enabled = telemetry_enabled
ShieldRunnerMixin.__init__(
self,
@ -188,28 +184,30 @@ class ChatAgent(ShieldRunnerMixin):
async def create_and_execute_turn(self, request: AgentTurnCreateRequest) -> AsyncGenerator:
turn_id = str(uuid.uuid4())
span = tracing.get_current_span()
if span:
span.set_attribute("session_id", request.session_id)
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled:
span = tracing.get_current_span()
if span is not None:
span.set_attribute("session_id", request.session_id)
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
await self._initialize_tools(request.toolgroups)
async for chunk in self._run_turn(request, turn_id):
yield chunk
async def resume_turn(self, request: AgentTurnResumeRequest) -> AsyncGenerator:
span = tracing.get_current_span()
if span:
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("session_id", request.session_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", request.turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled:
span = tracing.get_current_span()
if span is not None:
span.set_attribute("agent_id", self.agent_id)
span.set_attribute("session_id", request.session_id)
span.set_attribute("request", request.model_dump_json())
span.set_attribute("turn_id", request.turn_id)
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
await self._initialize_tools()
async for chunk in self._run_turn(request):
@ -395,9 +393,12 @@ class ChatAgent(ShieldRunnerMixin):
touchpoint: str,
) -> AsyncGenerator:
async with tracing.span("run_shields") as span:
span.set_attribute("input", [m.model_dump_json() for m in messages])
if self.telemetry_enabled and span is not None:
span.set_attribute("input", [m.model_dump_json() for m in messages])
if len(shields) == 0:
span.set_attribute("output", "no shields")
if len(shields) == 0:
span.set_attribute("output", "no shields")
return
step_id = str(uuid.uuid4())
@ -430,7 +431,8 @@ class ChatAgent(ShieldRunnerMixin):
)
)
)
span.set_attribute("output", e.violation.model_dump_json())
if self.telemetry_enabled and span is not None:
span.set_attribute("output", e.violation.model_dump_json())
yield CompletionMessage(
content=str(e),
@ -453,7 +455,8 @@ class ChatAgent(ShieldRunnerMixin):
)
)
)
span.set_attribute("output", "no violations")
if self.telemetry_enabled and span is not None:
span.set_attribute("output", "no violations")
async def _run(
self,
@ -518,8 +521,9 @@ class ChatAgent(ShieldRunnerMixin):
stop_reason: StopReason | None = None
async with tracing.span("inference") as span:
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
if self.telemetry_enabled and span is not None:
if self.agent_config.name:
span.set_attribute("agent_name", self.agent_config.name)
def _serialize_nested(value):
"""Recursively serialize nested Pydantic models to dicts."""
@ -579,7 +583,7 @@ class ChatAgent(ShieldRunnerMixin):
max_tokens = getattr(sampling_params, "max_tokens", None)
# Use OpenAI chat completion
openai_stream = await self.inference_api.openai_chat_completion(
params = OpenAIChatCompletionRequestWithExtraBody(
model=self.agent_config.model,
messages=openai_messages,
tools=openai_tools if openai_tools else None,
@ -590,6 +594,7 @@ class ChatAgent(ShieldRunnerMixin):
max_tokens=max_tokens,
stream=True,
)
openai_stream = await self.inference_api.openai_chat_completion(params)
# Convert OpenAI stream back to Llama Stack format
response_stream = convert_openai_chat_completion_stream(
@ -637,18 +642,19 @@ class ChatAgent(ShieldRunnerMixin):
else:
raise ValueError(f"Unexpected delta type {type(delta)}")
span.set_attribute("stop_reason", stop_reason or StopReason.end_of_turn)
span.set_attribute(
"input",
json.dumps([json.loads(m.model_dump_json()) for m in input_messages]),
)
output_attr = json.dumps(
{
"content": content,
"tool_calls": [json.loads(t.model_dump_json()) for t in tool_calls],
}
)
span.set_attribute("output", output_attr)
if self.telemetry_enabled and span is not None:
span.set_attribute("stop_reason", stop_reason or StopReason.end_of_turn)
span.set_attribute(
"input",
json.dumps([json.loads(m.model_dump_json()) for m in input_messages]),
)
output_attr = json.dumps(
{
"content": content,
"tool_calls": [json.loads(t.model_dump_json()) for t in tool_calls],
}
)
span.set_attribute("output", output_attr)
n_iter += 1
await self.storage.set_num_infer_iters_in_turn(session_id, turn_id, n_iter)
@ -756,7 +762,9 @@ class ChatAgent(ShieldRunnerMixin):
{
"tool_name": tool_call.tool_name,
"input": message.model_dump_json(),
},
}
if self.telemetry_enabled
else {},
) as span:
tool_execution_start_time = datetime.now(UTC).isoformat()
tool_result = await self.execute_tool_call_maybe(
@ -771,7 +779,8 @@ class ChatAgent(ShieldRunnerMixin):
call_id=tool_call.call_id,
content=tool_result.content,
)
span.set_attribute("output", result_message.model_dump_json())
if self.telemetry_enabled and span is not None:
span.set_attribute("output", result_message.model_dump_json())
# Store tool execution step
tool_execution_step = ToolExecutionStep(

View file

@ -30,6 +30,7 @@ from llama_stack.apis.agents import (
)
from llama_stack.apis.agents.openai_responses import OpenAIResponseText
from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.apis.conversations import Conversations
from llama_stack.apis.inference import (
Inference,
ToolConfig,
@ -63,7 +64,9 @@ class MetaReferenceAgentsImpl(Agents):
safety_api: Safety,
tool_runtime_api: ToolRuntime,
tool_groups_api: ToolGroups,
conversations_api: Conversations,
policy: list[AccessRule],
telemetry_enabled: bool = False,
):
self.config = config
self.inference_api = inference_api
@ -71,6 +74,8 @@ class MetaReferenceAgentsImpl(Agents):
self.safety_api = safety_api
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.conversations_api = conversations_api
self.telemetry_enabled = telemetry_enabled
self.in_memory_store = InmemoryKVStoreImpl()
self.openai_responses_impl: OpenAIResponsesImpl | None = None
@ -86,6 +91,7 @@ class MetaReferenceAgentsImpl(Agents):
tool_runtime_api=self.tool_runtime_api,
responses_store=self.responses_store,
vector_io_api=self.vector_io_api,
conversations_api=self.conversations_api,
)
async def create_agent(
@ -135,6 +141,7 @@ class MetaReferenceAgentsImpl(Agents):
),
created_at=agent_info.created_at,
policy=self.policy,
telemetry_enabled=self.telemetry_enabled,
)
async def create_agent_session(
@ -322,6 +329,7 @@ class MetaReferenceAgentsImpl(Agents):
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -336,6 +344,7 @@ class MetaReferenceAgentsImpl(Agents):
model,
instructions,
previous_response_id,
conversation,
store,
stream,
temperature,

View file

@ -24,6 +24,11 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseText,
OpenAIResponseTextFormat,
)
from llama_stack.apis.common.errors import (
InvalidConversationIdError,
)
from llama_stack.apis.conversations import Conversations
from llama_stack.apis.conversations.conversations import ConversationItem
from llama_stack.apis.inference import (
Inference,
OpenAIMessageParam,
@ -39,7 +44,7 @@ from llama_stack.providers.utils.responses.responses_store import (
from .streaming import StreamingResponseOrchestrator
from .tool_executor import ToolExecutor
from .types import ChatCompletionContext
from .types import ChatCompletionContext, ToolContext
from .utils import (
convert_response_input_to_chat_messages,
convert_response_text_to_chat_response_format,
@ -61,12 +66,14 @@ class OpenAIResponsesImpl:
tool_runtime_api: ToolRuntime,
responses_store: ResponsesStore,
vector_io_api: VectorIO, # VectorIO
conversations_api: Conversations,
):
self.inference_api = inference_api
self.tool_groups_api = tool_groups_api
self.tool_runtime_api = tool_runtime_api
self.responses_store = responses_store
self.vector_io_api = vector_io_api
self.conversations_api = conversations_api
self.tool_executor = ToolExecutor(
tool_groups_api=tool_groups_api,
tool_runtime_api=tool_runtime_api,
@ -91,13 +98,15 @@ class OpenAIResponsesImpl:
async def _process_input_with_previous_response(
self,
input: str | list[OpenAIResponseInput],
tools: list[OpenAIResponseInputTool] | None,
previous_response_id: str | None,
) -> tuple[str | list[OpenAIResponseInput], list[OpenAIMessageParam]]:
"""Process input with optional previous response context.
Returns:
tuple: (all_input for storage, messages for chat completion)
tuple: (all_input for storage, messages for chat completion, tool context)
"""
tool_context = ToolContext(tools)
if previous_response_id:
previous_response: _OpenAIResponseObjectWithInputAndMessages = (
await self.responses_store.get_response_object(previous_response_id)
@ -108,16 +117,18 @@ class OpenAIResponsesImpl:
# Use stored messages directly and convert only new input
message_adapter = TypeAdapter(list[OpenAIMessageParam])
messages = message_adapter.validate_python(previous_response.messages)
new_messages = await convert_response_input_to_chat_messages(input)
new_messages = await convert_response_input_to_chat_messages(input, previous_messages=messages)
messages.extend(new_messages)
else:
# Backward compatibility: reconstruct from inputs
messages = await convert_response_input_to_chat_messages(all_input)
tool_context.recover_tools_from_previous_response(previous_response)
else:
all_input = input
messages = await convert_response_input_to_chat_messages(input)
return all_input, messages
return all_input, messages, tool_context
async def _prepend_instructions(self, messages, instructions):
if instructions:
@ -201,6 +212,7 @@ class OpenAIResponsesImpl:
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -217,11 +229,27 @@ class OpenAIResponsesImpl:
if shields is not None:
raise NotImplementedError("Shields parameter is not yet implemented in the meta-reference provider")
if conversation is not None and previous_response_id is not None:
raise ValueError(
"Mutually exclusive parameters: 'previous_response_id' and 'conversation'. Ensure you are only providing one of these parameters."
)
original_input = input # needed for syncing to Conversations
if conversation is not None:
if not conversation.startswith("conv_"):
raise InvalidConversationIdError(conversation)
# Check conversation exists (raises ConversationNotFoundError if not)
_ = await self.conversations_api.get_conversation(conversation)
input = await self._load_conversation_context(conversation, input)
stream_gen = self._create_streaming_response(
input=input,
original_input=original_input,
model=model,
instructions=instructions,
previous_response_id=previous_response_id,
conversation=conversation,
store=store,
temperature=temperature,
text=text,
@ -232,24 +260,42 @@ class OpenAIResponsesImpl:
if stream:
return stream_gen
else:
response = None
async for stream_chunk in stream_gen:
if stream_chunk.type == "response.completed":
if response is not None:
raise ValueError("The response stream completed multiple times! Earlier response: {response}")
response = stream_chunk.response
# don't leave the generator half complete!
final_response = None
final_event_type = None
failed_response = None
if response is None:
raise ValueError("The response stream never completed")
return response
async for stream_chunk in stream_gen:
if stream_chunk.type in {"response.completed", "response.incomplete"}:
if final_response is not None:
raise ValueError(
"The response stream produced multiple terminal responses! "
f"Earlier response from {final_event_type}"
)
final_response = stream_chunk.response
final_event_type = stream_chunk.type
elif stream_chunk.type == "response.failed":
failed_response = stream_chunk.response
if failed_response is not None:
error_message = (
failed_response.error.message
if failed_response and failed_response.error
else "Response stream failed without error details"
)
raise RuntimeError(f"OpenAI response failed: {error_message}")
if final_response is None:
raise ValueError("The response stream never reached a terminal state")
return final_response
async def _create_streaming_response(
self,
input: str | list[OpenAIResponseInput],
model: str,
original_input: str | list[OpenAIResponseInput] | None = None,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
temperature: float | None = None,
text: OpenAIResponseText | None = None,
@ -257,7 +303,9 @@ class OpenAIResponsesImpl:
max_infer_iters: int | None = 10,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Input preprocessing
all_input, messages = await self._process_input_with_previous_response(input, previous_response_id)
all_input, messages, tool_context = await self._process_input_with_previous_response(
input, tools, previous_response_id
)
await self._prepend_instructions(messages, instructions)
# Structured outputs
@ -269,11 +317,12 @@ class OpenAIResponsesImpl:
response_tools=tools,
temperature=temperature,
response_format=response_format,
inputs=input,
tool_context=tool_context,
inputs=all_input,
)
# Create orchestrator and delegate streaming logic
response_id = f"resp-{uuid.uuid4()}"
response_id = f"resp_{uuid.uuid4()}"
created_at = int(time.time())
orchestrator = StreamingResponseOrchestrator(
@ -288,18 +337,110 @@ class OpenAIResponsesImpl:
# Stream the response
final_response = None
failed_response = None
async for stream_chunk in orchestrator.create_response():
if stream_chunk.type == "response.completed":
if stream_chunk.type in {"response.completed", "response.incomplete"}:
final_response = stream_chunk.response
elif stream_chunk.type == "response.failed":
failed_response = stream_chunk.response
yield stream_chunk
# Store the response if requested
if store and final_response:
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
# Store and sync immediately after yielding terminal events
# This ensures the storage/syncing happens even if the consumer breaks early
if (
stream_chunk.type in {"response.completed", "response.incomplete"}
and store
and final_response
and failed_response is None
):
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response:
# for Conversations, we need to use the original_input if it's available, otherwise use input
sync_input = original_input if original_input is not None else input
await self._sync_response_to_conversation(conversation, sync_input, final_response)
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id)
async def _load_conversation_context(
self, conversation_id: str, content: str | list[OpenAIResponseInput]
) -> list[OpenAIResponseInput]:
"""Load conversation history and merge with provided content."""
conversation_items = await self.conversations_api.list(conversation_id, order="asc")
context_messages = []
for item in conversation_items.data:
if isinstance(item, OpenAIResponseMessage):
if item.role == "user":
context_messages.append(
OpenAIResponseMessage(
role="user", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
elif item.role == "assistant":
context_messages.append(
OpenAIResponseMessage(
role="assistant", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
# add new content to context
if isinstance(content, str):
context_messages.append(OpenAIResponseMessage(role="user", content=content))
elif isinstance(content, list):
context_messages.extend(content)
return context_messages
async def _sync_response_to_conversation(
self, conversation_id: str, content: str | list[OpenAIResponseInput], response: OpenAIResponseObject
) -> None:
"""Sync content and response messages to the conversation."""
conversation_items = []
# add user content message(s)
if isinstance(content, str):
conversation_items.append(
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": content}]}
)
elif isinstance(content, list):
for item in content:
if not isinstance(item, OpenAIResponseMessage):
raise NotImplementedError(f"Unsupported input item type: {type(item)}")
if item.role == "user":
if isinstance(item.content, str):
conversation_items.append(
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": item.content}],
}
)
elif isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "user", "content": item.content})
else:
raise NotImplementedError(f"Unsupported user message content type: {type(item.content)}")
elif item.role == "assistant":
if isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": item.content})
else:
raise NotImplementedError(f"Unsupported assistant message content type: {type(item.content)}")
else:
raise NotImplementedError(f"Unsupported message role: {item.role}")
# add assistant response message
for output_item in response.output:
if isinstance(output_item, OpenAIResponseMessage) and output_item.role == "assistant":
if hasattr(output_item, "content") and isinstance(output_item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": output_item.content})
if conversation_items:
adapter = TypeAdapter(list[ConversationItem])
validated_items = adapter.validate_python(conversation_items)
await self.conversations_api.add_items(conversation_id, validated_items)

View file

@ -13,6 +13,9 @@ from llama_stack.apis.agents.openai_responses import (
ApprovalFilter,
MCPListToolsTool,
OpenAIResponseContentPartOutputText,
OpenAIResponseContentPartReasoningText,
OpenAIResponseContentPartRefusal,
OpenAIResponseError,
OpenAIResponseInputTool,
OpenAIResponseInputToolMCP,
OpenAIResponseMCPApprovalRequest,
@ -22,8 +25,11 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectStreamResponseContentPartAdded,
OpenAIResponseObjectStreamResponseContentPartDone,
OpenAIResponseObjectStreamResponseCreated,
OpenAIResponseObjectStreamResponseFailed,
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta,
OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone,
OpenAIResponseObjectStreamResponseIncomplete,
OpenAIResponseObjectStreamResponseInProgress,
OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta,
OpenAIResponseObjectStreamResponseMcpCallArgumentsDone,
OpenAIResponseObjectStreamResponseMcpListToolsCompleted,
@ -31,21 +37,31 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectStreamResponseOutputItemAdded,
OpenAIResponseObjectStreamResponseOutputItemDone,
OpenAIResponseObjectStreamResponseOutputTextDelta,
OpenAIResponseObjectStreamResponseReasoningTextDelta,
OpenAIResponseObjectStreamResponseReasoningTextDone,
OpenAIResponseObjectStreamResponseRefusalDelta,
OpenAIResponseObjectStreamResponseRefusalDone,
OpenAIResponseOutput,
OpenAIResponseOutputMessageFunctionToolCall,
OpenAIResponseOutputMessageMCPListTools,
OpenAIResponseText,
OpenAIResponseUsage,
OpenAIResponseUsageInputTokensDetails,
OpenAIResponseUsageOutputTokensDetails,
WebSearchToolTypes,
)
from llama_stack.apis.inference import (
Inference,
OpenAIAssistantMessageParam,
OpenAIChatCompletion,
OpenAIChatCompletionChunk,
OpenAIChatCompletionRequestWithExtraBody,
OpenAIChatCompletionToolCall,
OpenAIChoice,
OpenAIMessageParam,
)
from llama_stack.log import get_logger
from llama_stack.providers.utils.telemetry import tracing
from .types import ChatCompletionContext, ChatCompletionResult
from .utils import convert_chat_choice_to_response_message, is_function_tool_call
@ -94,113 +110,174 @@ class StreamingResponseOrchestrator:
self.tool_executor = tool_executor
self.sequence_number = 0
# Store MCP tool mapping that gets built during tool processing
self.mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] = {}
self.mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] = ctx.tool_context.previous_tools or {}
# Track final messages after all tool executions
self.final_messages: list[OpenAIMessageParam] = []
# mapping for annotations
self.citation_files: dict[str, str] = {}
# Track accumulated usage across all inference calls
self.accumulated_usage: OpenAIResponseUsage | None = None
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
# Initialize output messages
output_messages: list[OpenAIResponseOutput] = []
# Create initial response and emit response.created immediately
initial_response = OpenAIResponseObject(
def _clone_outputs(self, outputs: list[OpenAIResponseOutput]) -> list[OpenAIResponseOutput]:
cloned: list[OpenAIResponseOutput] = []
for item in outputs:
if hasattr(item, "model_copy"):
cloned.append(item.model_copy(deep=True))
else:
cloned.append(item)
return cloned
def _snapshot_response(
self,
status: str,
outputs: list[OpenAIResponseOutput],
*,
error: OpenAIResponseError | None = None,
) -> OpenAIResponseObject:
return OpenAIResponseObject(
created_at=self.created_at,
id=self.response_id,
model=self.ctx.model,
object="response",
status="in_progress",
output=output_messages.copy(),
status=status,
output=self._clone_outputs(outputs),
text=self.text,
tools=self.ctx.available_tools(),
error=error,
usage=self.accumulated_usage,
)
yield OpenAIResponseObjectStreamResponseCreated(response=initial_response)
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
output_messages: list[OpenAIResponseOutput] = []
# Process all tools (including MCP tools) and emit streaming events
if self.ctx.response_tools:
async for stream_event in self._process_tools(self.ctx.response_tools, output_messages):
yield stream_event
# Emit response.created followed by response.in_progress to align with OpenAI streaming
yield OpenAIResponseObjectStreamResponseCreated(
response=self._snapshot_response("in_progress", output_messages)
)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseInProgress(
response=self._snapshot_response("in_progress", output_messages),
sequence_number=self.sequence_number,
)
async for stream_event in self._process_tools(output_messages):
yield stream_event
n_iter = 0
messages = self.ctx.messages.copy()
final_status = "completed"
last_completion_result: ChatCompletionResult | None = None
while True:
# Text is the default response format for chat completion so don't need to pass it
# (some providers don't support non-empty response_format when tools are present)
response_format = None if self.ctx.response_format.type == "text" else self.ctx.response_format
completion_result = await self.inference_api.openai_chat_completion(
model=self.ctx.model,
messages=messages,
tools=self.ctx.chat_tools,
stream=True,
temperature=self.ctx.temperature,
response_format=response_format,
)
try:
while True:
# Text is the default response format for chat completion so don't need to pass it
# (some providers don't support non-empty response_format when tools are present)
response_format = None if self.ctx.response_format.type == "text" else self.ctx.response_format
logger.debug(f"calling openai_chat_completion with tools: {self.ctx.chat_tools}")
params = OpenAIChatCompletionRequestWithExtraBody(
model=self.ctx.model,
messages=messages,
tools=self.ctx.chat_tools,
stream=True,
temperature=self.ctx.temperature,
response_format=response_format,
stream_options={
"include_usage": True,
},
)
completion_result = await self.inference_api.openai_chat_completion(params)
# Process streaming chunks and build complete response
completion_result_data = None
async for stream_event_or_result in self._process_streaming_chunks(completion_result, output_messages):
if isinstance(stream_event_or_result, ChatCompletionResult):
completion_result_data = stream_event_or_result
else:
yield stream_event_or_result
if not completion_result_data:
raise ValueError("Streaming chunk processor failed to return completion data")
current_response = self._build_chat_completion(completion_result_data)
# Process streaming chunks and build complete response
completion_result_data = None
async for stream_event_or_result in self._process_streaming_chunks(completion_result, output_messages):
if isinstance(stream_event_or_result, ChatCompletionResult):
completion_result_data = stream_event_or_result
else:
yield stream_event_or_result
if not completion_result_data:
raise ValueError("Streaming chunk processor failed to return completion data")
last_completion_result = completion_result_data
current_response = self._build_chat_completion(completion_result_data)
function_tool_calls, non_function_tool_calls, approvals, next_turn_messages = self._separate_tool_calls(
current_response, messages
)
(
function_tool_calls,
non_function_tool_calls,
approvals,
next_turn_messages,
) = self._separate_tool_calls(current_response, messages)
# add any approval requests required
for tool_call in approvals:
async for evt in self._add_mcp_approval_request(
tool_call.function.name, tool_call.function.arguments, output_messages
# add any approval requests required
for tool_call in approvals:
async for evt in self._add_mcp_approval_request(
tool_call.function.name, tool_call.function.arguments, output_messages
):
yield evt
# Handle choices with no tool calls
for choice in current_response.choices:
if not (choice.message.tool_calls and self.ctx.response_tools):
output_messages.append(
await convert_chat_choice_to_response_message(
choice,
self.citation_files,
message_id=completion_result_data.message_item_id,
)
)
# Execute tool calls and coordinate results
async for stream_event in self._coordinate_tool_execution(
function_tool_calls,
non_function_tool_calls,
completion_result_data,
output_messages,
next_turn_messages,
):
yield evt
yield stream_event
# Handle choices with no tool calls
for choice in current_response.choices:
if not (choice.message.tool_calls and self.ctx.response_tools):
output_messages.append(await convert_chat_choice_to_response_message(choice))
messages = next_turn_messages
# Execute tool calls and coordinate results
async for stream_event in self._coordinate_tool_execution(
function_tool_calls,
non_function_tool_calls,
completion_result_data,
output_messages,
next_turn_messages,
):
yield stream_event
if not function_tool_calls and not non_function_tool_calls:
break
if not function_tool_calls and not non_function_tool_calls:
break
if function_tool_calls:
logger.info("Exiting inference loop since there is a function (client-side) tool call")
break
if function_tool_calls:
logger.info("Exiting inference loop since there is a function (client-side) tool call")
break
n_iter += 1
if n_iter >= self.max_infer_iters:
logger.info(
f"Exiting inference loop since iteration count({n_iter}) exceeds {self.max_infer_iters=}"
)
final_status = "incomplete"
break
n_iter += 1
if n_iter >= self.max_infer_iters:
logger.info(f"Exiting inference loop since iteration count({n_iter}) exceeds {self.max_infer_iters=}")
break
if last_completion_result and last_completion_result.finish_reason == "length":
final_status = "incomplete"
messages = next_turn_messages
except Exception as exc: # noqa: BLE001
self.final_messages = messages.copy()
self.sequence_number += 1
error = OpenAIResponseError(code="internal_error", message=str(exc))
failure_response = self._snapshot_response("failed", output_messages, error=error)
yield OpenAIResponseObjectStreamResponseFailed(
response=failure_response,
sequence_number=self.sequence_number,
)
return
self.final_messages = messages.copy() + [current_response.choices[0].message]
self.final_messages = messages.copy()
# Create final response
final_response = OpenAIResponseObject(
created_at=self.created_at,
id=self.response_id,
model=self.ctx.model,
object="response",
status="completed",
text=self.text,
output=output_messages,
)
# Emit response.completed
yield OpenAIResponseObjectStreamResponseCompleted(response=final_response)
if final_status == "incomplete":
self.sequence_number += 1
final_response = self._snapshot_response("incomplete", output_messages)
yield OpenAIResponseObjectStreamResponseIncomplete(
response=final_response,
sequence_number=self.sequence_number,
)
else:
final_response = self._snapshot_response("completed", output_messages)
yield OpenAIResponseObjectStreamResponseCompleted(response=final_response)
def _separate_tool_calls(self, current_response, messages) -> tuple[list, list, list, list]:
"""Separate tool calls into function and non-function categories."""
@ -211,6 +288,8 @@ class StreamingResponseOrchestrator:
for choice in current_response.choices:
next_turn_messages.append(choice.message)
logger.debug(f"Choice message content: {choice.message.content}")
logger.debug(f"Choice message tool_calls: {choice.message.tool_calls}")
if choice.message.tool_calls and self.ctx.response_tools:
for tool_call in choice.message.tool_calls:
@ -227,14 +306,183 @@ class StreamingResponseOrchestrator:
non_function_tool_calls.append(tool_call)
else:
logger.info(f"Approval denied for {tool_call.id} on {tool_call.function.name}")
next_turn_messages.pop()
else:
logger.info(f"Requesting approval for {tool_call.id} on {tool_call.function.name}")
approvals.append(tool_call)
next_turn_messages.pop()
else:
non_function_tool_calls.append(tool_call)
return function_tool_calls, non_function_tool_calls, approvals, next_turn_messages
def _accumulate_chunk_usage(self, chunk: OpenAIChatCompletionChunk) -> None:
"""Accumulate usage from a streaming chunk into the response usage format."""
if not chunk.usage:
return
if self.accumulated_usage is None:
# Convert from chat completion format to response format
self.accumulated_usage = OpenAIResponseUsage(
input_tokens=chunk.usage.prompt_tokens,
output_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
input_tokens_details=(
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
if chunk.usage.prompt_tokens_details
else None
),
output_tokens_details=(
OpenAIResponseUsageOutputTokensDetails(
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
)
if chunk.usage.completion_tokens_details
else None
),
)
else:
# Accumulate across multiple inference calls
self.accumulated_usage = OpenAIResponseUsage(
input_tokens=self.accumulated_usage.input_tokens + chunk.usage.prompt_tokens,
output_tokens=self.accumulated_usage.output_tokens + chunk.usage.completion_tokens,
total_tokens=self.accumulated_usage.total_tokens + chunk.usage.total_tokens,
# Use latest non-null details
input_tokens_details=(
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
if chunk.usage.prompt_tokens_details
else self.accumulated_usage.input_tokens_details
),
output_tokens_details=(
OpenAIResponseUsageOutputTokensDetails(
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
)
if chunk.usage.completion_tokens_details
else self.accumulated_usage.output_tokens_details
),
)
async def _handle_reasoning_content_chunk(
self,
reasoning_content: str,
reasoning_part_emitted: bool,
reasoning_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Emit content_part.added event for first reasoning chunk
if not reasoning_part_emitted:
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=reasoning_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartReasoningText(
text="", # Will be filled incrementally via reasoning deltas
),
sequence_number=self.sequence_number,
)
# Emit reasoning_text.delta event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseReasoningTextDelta(
content_index=reasoning_content_index,
delta=reasoning_content,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
async def _handle_refusal_content_chunk(
self,
refusal_content: str,
refusal_part_emitted: bool,
refusal_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Emit content_part.added event for first refusal chunk
if not refusal_part_emitted:
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=refusal_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartRefusal(
refusal="", # Will be filled incrementally via refusal deltas
),
sequence_number=self.sequence_number,
)
# Emit refusal.delta event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseRefusalDelta(
content_index=refusal_content_index,
delta=refusal_content,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
async def _emit_reasoning_done_events(
self,
reasoning_text_accumulated: list[str],
reasoning_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
final_reasoning_text = "".join(reasoning_text_accumulated)
# Emit reasoning_text.done event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseReasoningTextDone(
content_index=reasoning_content_index,
text=final_reasoning_text,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
# Emit content_part.done for reasoning
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=reasoning_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartReasoningText(
text=final_reasoning_text,
),
sequence_number=self.sequence_number,
)
async def _emit_refusal_done_events(
self,
refusal_text_accumulated: list[str],
refusal_content_index: int,
message_item_id: str,
message_output_index: int,
) -> AsyncIterator[OpenAIResponseObjectStream]:
final_refusal_text = "".join(refusal_text_accumulated)
# Emit refusal.done event
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseRefusalDone(
content_index=refusal_content_index,
refusal=final_refusal_text,
item_id=message_item_id,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
# Emit content_part.done for refusal
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=refusal_content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartRefusal(
refusal=final_refusal_text,
),
sequence_number=self.sequence_number,
)
async def _process_streaming_chunks(
self, completion_result, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream | ChatCompletionResult]:
@ -253,11 +501,23 @@ class StreamingResponseOrchestrator:
tool_call_item_ids: dict[int, str] = {}
# Track content parts for streaming events
content_part_emitted = False
reasoning_part_emitted = False
refusal_part_emitted = False
content_index = 0
reasoning_content_index = 1 # reasoning is a separate content part
refusal_content_index = 2 # refusal is a separate content part
message_output_index = len(output_messages)
reasoning_text_accumulated = []
refusal_text_accumulated = []
async for chunk in completion_result:
chat_response_id = chunk.id
chunk_created = chunk.created
chunk_model = chunk.model
# Accumulate usage from chunks (typically in final chunk with stream_options)
self._accumulate_chunk_usage(chunk)
for chunk_choice in chunk.choices:
# Emit incremental text content as delta events
if chunk_choice.delta.content:
@ -266,8 +526,10 @@ class StreamingResponseOrchestrator:
content_part_emitted = True
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartAdded(
content_index=content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartOutputText(
text="", # Will be filled incrementally via text deltas
),
@ -275,10 +537,10 @@ class StreamingResponseOrchestrator:
)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputTextDelta(
content_index=0,
content_index=content_index,
delta=chunk_choice.delta.content,
item_id=message_item_id,
output_index=0,
output_index=message_output_index,
sequence_number=self.sequence_number,
)
@ -287,6 +549,32 @@ class StreamingResponseOrchestrator:
if chunk_choice.finish_reason:
chunk_finish_reason = chunk_choice.finish_reason
# Handle reasoning content if present (non-standard field for o1/o3 models)
if hasattr(chunk_choice.delta, "reasoning_content") and chunk_choice.delta.reasoning_content:
async for event in self._handle_reasoning_content_chunk(
reasoning_content=chunk_choice.delta.reasoning_content,
reasoning_part_emitted=reasoning_part_emitted,
reasoning_content_index=reasoning_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
reasoning_part_emitted = True
reasoning_text_accumulated.append(chunk_choice.delta.reasoning_content)
# Handle refusal content if present
if chunk_choice.delta.refusal:
async for event in self._handle_refusal_content_chunk(
refusal_content=chunk_choice.delta.refusal,
refusal_part_emitted=refusal_part_emitted,
refusal_content_index=refusal_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
refusal_part_emitted = True
refusal_text_accumulated.append(chunk_choice.delta.refusal)
# Aggregate tool call arguments across chunks
if chunk_choice.delta.tool_calls:
for tool_call in chunk_choice.delta.tool_calls:
@ -378,14 +666,36 @@ class StreamingResponseOrchestrator:
final_text = "".join(chat_response_content)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseContentPartDone(
content_index=content_index,
response_id=self.response_id,
item_id=message_item_id,
output_index=message_output_index,
part=OpenAIResponseContentPartOutputText(
text=final_text,
),
sequence_number=self.sequence_number,
)
# Emit reasoning done events if reasoning content was streamed
if reasoning_part_emitted:
async for event in self._emit_reasoning_done_events(
reasoning_text_accumulated=reasoning_text_accumulated,
reasoning_content_index=reasoning_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
# Emit refusal done events if refusal content was streamed
if refusal_part_emitted:
async for event in self._emit_refusal_done_events(
refusal_text_accumulated=refusal_text_accumulated,
refusal_content_index=refusal_content_index,
message_item_id=message_item_id,
message_output_index=message_output_index,
):
yield event
# Clear content when there are tool calls (OpenAI spec behavior)
if chat_response_tool_calls:
chat_response_content = []
@ -470,6 +780,8 @@ class StreamingResponseOrchestrator:
tool_call_log = result.final_output_message
tool_response_message = result.final_input_message
self.sequence_number = result.sequence_number
if result.citation_files:
self.citation_files.update(result.citation_files)
if tool_call_log:
output_messages.append(tool_call_log)
@ -518,7 +830,7 @@ class StreamingResponseOrchestrator:
sequence_number=self.sequence_number,
)
async def _process_tools(
async def _process_new_tools(
self, tools: list[OpenAIResponseInputTool], output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream]:
"""Process all tools and emit appropriate streaming events."""
@ -573,7 +885,6 @@ class StreamingResponseOrchestrator:
yield OpenAIResponseObjectStreamResponseMcpListToolsInProgress(
sequence_number=self.sequence_number,
)
try:
# Parse allowed/never allowed tools
always_allowed = None
@ -586,14 +897,22 @@ class StreamingResponseOrchestrator:
never_allowed = mcp_tool.allowed_tools.never
# Call list_mcp_tools
tool_defs = await list_mcp_tools(
endpoint=mcp_tool.server_url,
headers=mcp_tool.headers or {},
)
tool_defs = None
list_id = f"mcp_list_{uuid.uuid4()}"
attributes = {
"server_label": mcp_tool.server_label,
"server_url": mcp_tool.server_url,
"mcp_list_tools_id": list_id,
}
async with tracing.span("list_mcp_tools", attributes):
tool_defs = await list_mcp_tools(
endpoint=mcp_tool.server_url,
headers=mcp_tool.headers or {},
)
# Create the MCP list tools message
mcp_list_message = OpenAIResponseOutputMessageMCPListTools(
id=f"mcp_list_{uuid.uuid4()}",
id=list_id,
server_label=mcp_tool.server_label,
tools=[],
)
@ -627,39 +946,26 @@ class StreamingResponseOrchestrator:
},
)
)
# Add the MCP list message to output
output_messages.append(mcp_list_message)
# Emit output_item.added for the MCP list tools message
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputItemAdded(
response_id=self.response_id,
item=mcp_list_message,
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
# Emit mcp_list_tools.completed
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseMcpListToolsCompleted(
sequence_number=self.sequence_number,
)
# Emit output_item.done for the MCP list tools message
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputItemDone(
response_id=self.response_id,
item=mcp_list_message,
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
async for stream_event in self._add_mcp_list_tools(mcp_list_message, output_messages):
yield stream_event
except Exception as e:
# TODO: Emit mcp_list_tools.failed event if needed
logger.exception(f"Failed to list MCP tools from {mcp_tool.server_url}: {e}")
raise
async def _process_tools(
self, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Handle all mcp tool lists from previous response that are still valid:
for tool in self.ctx.tool_context.previous_tool_listings:
async for evt in self._reuse_mcp_list_tools(tool, output_messages):
yield evt
# Process all remaining tools (including MCP tools) and emit streaming events
if self.ctx.tool_context.tools_to_process:
async for stream_event in self._process_new_tools(self.ctx.tool_context.tools_to_process, output_messages):
yield stream_event
def _approval_required(self, tool_name: str) -> bool:
if tool_name not in self.mcp_tool_to_server:
return False
@ -694,7 +1000,6 @@ class StreamingResponseOrchestrator:
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputItemDone(
response_id=self.response_id,
@ -702,3 +1007,60 @@ class StreamingResponseOrchestrator:
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
async def _add_mcp_list_tools(
self, mcp_list_message: OpenAIResponseOutputMessageMCPListTools, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream]:
# Add the MCP list message to output
output_messages.append(mcp_list_message)
# Emit output_item.added for the MCP list tools message
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputItemAdded(
response_id=self.response_id,
item=mcp_list_message,
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
# Emit mcp_list_tools.completed
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseMcpListToolsCompleted(
sequence_number=self.sequence_number,
)
# Emit output_item.done for the MCP list tools message
self.sequence_number += 1
yield OpenAIResponseObjectStreamResponseOutputItemDone(
response_id=self.response_id,
item=mcp_list_message,
output_index=len(output_messages) - 1,
sequence_number=self.sequence_number,
)
async def _reuse_mcp_list_tools(
self, original: OpenAIResponseOutputMessageMCPListTools, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream]:
for t in original.tools:
from llama_stack.models.llama.datatypes import ToolDefinition
from llama_stack.providers.utils.inference.openai_compat import convert_tooldef_to_openai_tool
# convert from input_schema to map of ToolParamDefinitions...
tool_def = ToolDefinition(
tool_name=t.name,
description=t.description,
input_schema=t.input_schema,
)
# ...then can convert that to openai completions tool
openai_tool = convert_tooldef_to_openai_tool(tool_def)
if self.ctx.chat_tools is None:
self.ctx.chat_tools = []
self.ctx.chat_tools.append(openai_tool)
mcp_list_message = OpenAIResponseOutputMessageMCPListTools(
id=f"mcp_list_{uuid.uuid4()}",
server_label=original.server_label,
tools=original.tools,
)
async for stream_event in self._add_mcp_list_tools(mcp_list_message, output_messages):
yield stream_event

View file

@ -11,6 +11,9 @@ from collections.abc import AsyncIterator
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseInputToolFileSearch,
OpenAIResponseInputToolMCP,
OpenAIResponseObjectStreamResponseFileSearchCallCompleted,
OpenAIResponseObjectStreamResponseFileSearchCallInProgress,
OpenAIResponseObjectStreamResponseFileSearchCallSearching,
OpenAIResponseObjectStreamResponseMcpCallCompleted,
OpenAIResponseObjectStreamResponseMcpCallFailed,
OpenAIResponseObjectStreamResponseMcpCallInProgress,
@ -35,6 +38,7 @@ from llama_stack.apis.inference import (
from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime
from llama_stack.apis.vector_io import VectorIO
from llama_stack.log import get_logger
from llama_stack.providers.utils.telemetry import tracing
from .types import ChatCompletionContext, ToolExecutionResult
@ -94,7 +98,10 @@ class ToolExecutor:
# Yield the final result
yield ToolExecutionResult(
sequence_number=sequence_number, final_output_message=output_message, final_input_message=input_message
sequence_number=sequence_number,
final_output_message=output_message,
final_input_message=input_message,
citation_files=result.metadata.get("citation_files") if result and result.metadata else None,
)
async def _execute_knowledge_search_via_vector_store(
@ -129,8 +136,6 @@ class ToolExecutor:
for results in all_results:
search_results.extend(results)
# Convert search results to tool result format matching memory.py
# Format the results as interleaved content similar to memory.py
content_items = []
content_items.append(
TextContentItem(
@ -138,27 +143,58 @@ class ToolExecutor:
)
)
unique_files = set()
for i, result_item in enumerate(search_results):
chunk_text = result_item.content[0].text if result_item.content else ""
metadata_text = f"document_id: {result_item.file_id}, score: {result_item.score}"
# Get file_id from attributes if result_item.file_id is empty
file_id = result_item.file_id or (
result_item.attributes.get("document_id") if result_item.attributes else None
)
metadata_text = f"document_id: {file_id}, score: {result_item.score}"
if result_item.attributes:
metadata_text += f", attributes: {result_item.attributes}"
text_content = f"[{i + 1}] {metadata_text}\n{chunk_text}\n"
text_content = f"[{i + 1}] {metadata_text} (cite as <|{file_id}|>)\n{chunk_text}\n"
content_items.append(TextContentItem(text=text_content))
unique_files.add(file_id)
content_items.append(TextContentItem(text="END of knowledge_search tool results.\n"))
citation_instruction = ""
if unique_files:
citation_instruction = (
" Cite sources immediately at the end of sentences before punctuation, using `<|file-id|>` format (e.g., 'This is a fact <|file-Cn3MSNn72ENTiiq11Qda4A|>.'). "
"Do not add extra punctuation. Use only the file IDs provided (do not invent new ones)."
)
content_items.append(
TextContentItem(
text=f'The above results were retrieved to help answer the user\'s query: "{query}". Use them as supporting information only in answering this query.\n',
text=f'The above results were retrieved to help answer the user\'s query: "{query}". Use them as supporting information only in answering this query.{citation_instruction}\n',
)
)
# handling missing attributes for old versions
citation_files = {}
for result in search_results:
file_id = result.file_id
if not file_id and result.attributes:
file_id = result.attributes.get("document_id")
filename = result.filename
if not filename and result.attributes:
filename = result.attributes.get("filename")
if not filename:
filename = "unknown"
citation_files[file_id] = filename
return ToolInvocationResult(
content=content_items,
metadata={
"document_ids": [r.file_id for r in search_results],
"chunks": [r.content[0].text if r.content else "" for r in search_results],
"scores": [r.score for r in search_results],
"citation_files": citation_files,
},
)
@ -188,7 +224,13 @@ class ToolExecutor:
output_index=output_index,
sequence_number=sequence_number,
)
# Note: knowledge_search and other custom tools don't have specific streaming events in OpenAI spec
elif function_name == "knowledge_search":
sequence_number += 1
progress_event = OpenAIResponseObjectStreamResponseFileSearchCallInProgress(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
if progress_event:
yield ToolExecutionResult(stream_event=progress_event, sequence_number=sequence_number)
@ -203,6 +245,16 @@ class ToolExecutor:
)
yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number)
# For file search, emit searching event
if function_name == "knowledge_search":
sequence_number += 1
searching_event = OpenAIResponseObjectStreamResponseFileSearchCallSearching(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number)
async def _execute_tool(
self,
function_name: str,
@ -219,12 +271,18 @@ class ToolExecutor:
from llama_stack.providers.utils.tools.mcp import invoke_mcp_tool
mcp_tool = mcp_tool_to_server[function_name]
result = await invoke_mcp_tool(
endpoint=mcp_tool.server_url,
headers=mcp_tool.headers or {},
tool_name=function_name,
kwargs=tool_kwargs,
)
attributes = {
"server_label": mcp_tool.server_label,
"server_url": mcp_tool.server_url,
"tool_name": function_name,
}
async with tracing.span("invoke_mcp_tool", attributes):
result = await invoke_mcp_tool(
endpoint=mcp_tool.server_url,
headers=mcp_tool.headers or {},
tool_name=function_name,
kwargs=tool_kwargs,
)
elif function_name == "knowledge_search":
response_file_search_tool = next(
(t for t in ctx.response_tools if isinstance(t, OpenAIResponseInputToolFileSearch)),
@ -234,15 +292,20 @@ class ToolExecutor:
# Use vector_stores.search API instead of knowledge_search tool
# to support filters and ranking_options
query = tool_kwargs.get("query", "")
result = await self._execute_knowledge_search_via_vector_store(
query=query,
response_file_search_tool=response_file_search_tool,
)
async with tracing.span("knowledge_search", {}):
result = await self._execute_knowledge_search_via_vector_store(
query=query,
response_file_search_tool=response_file_search_tool,
)
else:
result = await self.tool_runtime_api.invoke_tool(
tool_name=function_name,
kwargs=tool_kwargs,
)
attributes = {
"tool_name": function_name,
}
async with tracing.span("invoke_tool", attributes):
result = await self.tool_runtime_api.invoke_tool(
tool_name=function_name,
kwargs=tool_kwargs,
)
except Exception as e:
error_exc = e
@ -278,7 +341,13 @@ class ToolExecutor:
output_index=output_index,
sequence_number=sequence_number,
)
# Note: knowledge_search and other custom tools don't have specific completion events in OpenAI spec
elif function_name == "knowledge_search":
sequence_number += 1
completion_event = OpenAIResponseObjectStreamResponseFileSearchCallCompleted(
item_id=item_id,
output_index=output_index,
sequence_number=sequence_number,
)
if completion_event:
yield ToolExecutionResult(stream_event=completion_event, sequence_number=sequence_number)

View file

@ -12,10 +12,18 @@ from pydantic import BaseModel
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseInput,
OpenAIResponseInputTool,
OpenAIResponseInputToolFileSearch,
OpenAIResponseInputToolFunction,
OpenAIResponseInputToolMCP,
OpenAIResponseInputToolWebSearch,
OpenAIResponseMCPApprovalRequest,
OpenAIResponseMCPApprovalResponse,
OpenAIResponseObject,
OpenAIResponseObjectStream,
OpenAIResponseOutput,
OpenAIResponseOutputMessageMCPListTools,
OpenAIResponseTool,
OpenAIResponseToolMCP,
)
from llama_stack.apis.inference import OpenAIChatCompletionToolCall, OpenAIMessageParam, OpenAIResponseFormatParam
@ -27,6 +35,7 @@ class ToolExecutionResult(BaseModel):
sequence_number: int
final_output_message: OpenAIResponseOutput | None = None
final_input_message: OpenAIMessageParam | None = None
citation_files: dict[str, str] | None = None
@dataclass
@ -54,6 +63,86 @@ class ChatCompletionResult:
return bool(self.tool_calls)
class ToolContext(BaseModel):
"""Holds information about tools from this and (if relevant)
previous response in order to facilitate reuse of previous
listings where appropriate."""
# tools argument passed into current request:
current_tools: list[OpenAIResponseInputTool]
# reconstructed map of tool -> mcp server from previous response:
previous_tools: dict[str, OpenAIResponseInputToolMCP]
# reusable mcp-list-tools objects from previous response:
previous_tool_listings: list[OpenAIResponseOutputMessageMCPListTools]
# tool arguments from current request that still need to be processed:
tools_to_process: list[OpenAIResponseInputTool]
def __init__(
self,
current_tools: list[OpenAIResponseInputTool] | None,
):
super().__init__(
current_tools=current_tools or [],
previous_tools={},
previous_tool_listings=[],
tools_to_process=current_tools or [],
)
def recover_tools_from_previous_response(
self,
previous_response: OpenAIResponseObject,
):
"""Determine which mcp_list_tools objects from previous response we can reuse."""
if self.current_tools and previous_response.tools:
previous_tools_by_label: dict[str, OpenAIResponseToolMCP] = {}
for tool in previous_response.tools:
if isinstance(tool, OpenAIResponseToolMCP):
previous_tools_by_label[tool.server_label] = tool
# collect tool definitions which are the same in current and previous requests:
tools_to_process = []
matched: dict[str, OpenAIResponseInputToolMCP] = {}
for tool in self.current_tools:
if isinstance(tool, OpenAIResponseInputToolMCP) and tool.server_label in previous_tools_by_label:
previous_tool = previous_tools_by_label[tool.server_label]
if previous_tool.allowed_tools == tool.allowed_tools:
matched[tool.server_label] = tool
else:
tools_to_process.append(tool)
else:
tools_to_process.append(tool)
# tools that are not the same or were not previously defined need to be processed:
self.tools_to_process = tools_to_process
# for all matched definitions, get the mcp_list_tools objects from the previous output:
self.previous_tool_listings = [
obj for obj in previous_response.output if obj.type == "mcp_list_tools" and obj.server_label in matched
]
# reconstruct the tool to server mappings that can be reused:
for listing in self.previous_tool_listings:
definition = matched[listing.server_label]
for tool in listing.tools:
self.previous_tools[tool.name] = definition
def available_tools(self) -> list[OpenAIResponseTool]:
if not self.current_tools:
return []
def convert_tool(tool: OpenAIResponseInputTool) -> OpenAIResponseTool:
if isinstance(tool, OpenAIResponseInputToolWebSearch):
return tool
if isinstance(tool, OpenAIResponseInputToolFileSearch):
return tool
if isinstance(tool, OpenAIResponseInputToolFunction):
return tool
if isinstance(tool, OpenAIResponseInputToolMCP):
return OpenAIResponseToolMCP(
server_label=tool.server_label,
allowed_tools=tool.allowed_tools,
)
return [convert_tool(tool) for tool in self.current_tools]
class ChatCompletionContext(BaseModel):
model: str
messages: list[OpenAIMessageParam]
@ -61,6 +150,7 @@ class ChatCompletionContext(BaseModel):
chat_tools: list[ChatCompletionToolParam] | None = None
temperature: float | None
response_format: OpenAIResponseFormatParam
tool_context: ToolContext | None
approval_requests: list[OpenAIResponseMCPApprovalRequest] = []
approval_responses: dict[str, OpenAIResponseMCPApprovalResponse] = {}
@ -71,6 +161,7 @@ class ChatCompletionContext(BaseModel):
response_tools: list[OpenAIResponseInputTool] | None,
temperature: float | None,
response_format: OpenAIResponseFormatParam,
tool_context: ToolContext,
inputs: list[OpenAIResponseInput] | str,
):
super().__init__(
@ -79,6 +170,7 @@ class ChatCompletionContext(BaseModel):
response_tools=response_tools,
temperature=temperature,
response_format=response_format,
tool_context=tool_context,
)
if not isinstance(inputs, str):
self.approval_requests = [input for input in inputs if input.type == "mcp_approval_request"]
@ -95,3 +187,8 @@ class ChatCompletionContext(BaseModel):
if request.name == tool_name and request.arguments == arguments:
return request
return None
def available_tools(self) -> list[OpenAIResponseTool]:
if not self.tool_context:
return []
return self.tool_context.available_tools()

View file

@ -4,9 +4,11 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import re
import uuid
from llama_stack.apis.agents.openai_responses import (
OpenAIResponseAnnotationFileCitation,
OpenAIResponseInput,
OpenAIResponseInputFunctionToolCallOutput,
OpenAIResponseInputMessageContent,
@ -45,7 +47,12 @@ from llama_stack.apis.inference import (
)
async def convert_chat_choice_to_response_message(choice: OpenAIChoice) -> OpenAIResponseMessage:
async def convert_chat_choice_to_response_message(
choice: OpenAIChoice,
citation_files: dict[str, str] | None = None,
*,
message_id: str | None = None,
) -> OpenAIResponseMessage:
"""Convert an OpenAI Chat Completion choice into an OpenAI Response output message."""
output_content = ""
if isinstance(choice.message.content, str):
@ -57,9 +64,11 @@ async def convert_chat_choice_to_response_message(choice: OpenAIChoice) -> OpenA
f"Llama Stack OpenAI Responses does not yet support output content type: {type(choice.message.content)}"
)
annotations, clean_text = _extract_citations_from_text(output_content, citation_files or {})
return OpenAIResponseMessage(
id=f"msg_{uuid.uuid4()}",
content=[OpenAIResponseOutputMessageContentOutputText(text=output_content)],
id=message_id or f"msg_{uuid.uuid4()}",
content=[OpenAIResponseOutputMessageContentOutputText(text=clean_text, annotations=annotations)],
status="completed",
role="assistant",
)
@ -97,9 +106,13 @@ async def convert_response_content_to_chat_content(
async def convert_response_input_to_chat_messages(
input: str | list[OpenAIResponseInput],
previous_messages: list[OpenAIMessageParam] | None = None,
) -> list[OpenAIMessageParam]:
"""
Convert the input from an OpenAI Response API request into OpenAI Chat Completion messages.
:param input: The input to convert
:param previous_messages: Optional previous messages to check for function_call references
"""
messages: list[OpenAIMessageParam] = []
if isinstance(input, list):
@ -163,16 +176,53 @@ async def convert_response_input_to_chat_messages(
raise ValueError(
f"Llama Stack OpenAI Responses does not yet support message role '{input_item.role}' in this context"
)
# Skip user messages that duplicate the last user message in previous_messages
# This handles cases where input includes context for function_call_outputs
if previous_messages and input_item.role == "user":
last_user_msg = None
for msg in reversed(previous_messages):
if isinstance(msg, OpenAIUserMessageParam):
last_user_msg = msg
break
if last_user_msg:
last_user_content = getattr(last_user_msg, "content", None)
if last_user_content == content:
continue # Skip duplicate user message
messages.append(message_type(content=content))
if len(tool_call_results):
raise ValueError(
f"Received function_call_output(s) with call_id(s) {tool_call_results.keys()}, but no corresponding function_call"
)
# Check if unpaired function_call_outputs reference function_calls from previous messages
if previous_messages:
previous_call_ids = _extract_tool_call_ids(previous_messages)
for call_id in list(tool_call_results.keys()):
if call_id in previous_call_ids:
# Valid: this output references a call from previous messages
# Add the tool message
messages.append(tool_call_results[call_id])
del tool_call_results[call_id]
# If still have unpaired outputs, error
if len(tool_call_results):
raise ValueError(
f"Received function_call_output(s) with call_id(s) {tool_call_results.keys()}, but no corresponding function_call"
)
else:
messages.append(OpenAIUserMessageParam(content=input))
return messages
def _extract_tool_call_ids(messages: list[OpenAIMessageParam]) -> set[str]:
"""Extract all tool_call IDs from messages."""
call_ids = set()
for msg in messages:
if isinstance(msg, OpenAIAssistantMessageParam):
tool_calls = getattr(msg, "tool_calls", None)
if tool_calls:
for tool_call in tool_calls:
# tool_call is a Pydantic model, use attribute access
call_ids.add(tool_call.id)
return call_ids
async def convert_response_text_to_chat_response_format(
text: OpenAIResponseText,
) -> OpenAIResponseFormatParam:
@ -200,6 +250,53 @@ async def get_message_type_by_role(role: str):
return role_to_type.get(role)
def _extract_citations_from_text(
text: str, citation_files: dict[str, str]
) -> tuple[list[OpenAIResponseAnnotationFileCitation], str]:
"""Extract citation markers from text and create annotations
Args:
text: The text containing citation markers like [file-Cn3MSNn72ENTiiq11Qda4A]
citation_files: Dictionary mapping file_id to filename
Returns:
Tuple of (annotations_list, clean_text_without_markers)
"""
file_id_regex = re.compile(r"<\|(?P<file_id>file-[A-Za-z0-9_-]+)\|>")
annotations = []
parts = []
total_len = 0
last_end = 0
for m in file_id_regex.finditer(text):
# segment before the marker
prefix = text[last_end : m.start()]
# drop one space if it exists (since marker is at sentence end)
if prefix.endswith(" "):
prefix = prefix[:-1]
parts.append(prefix)
total_len += len(prefix)
fid = m.group(1)
if fid in citation_files:
annotations.append(
OpenAIResponseAnnotationFileCitation(
file_id=fid,
filename=citation_files[fid],
index=total_len, # index points to punctuation
)
)
last_end = m.end()
parts.append(text[last_end:])
cleaned_text = "".join(parts)
return annotations, cleaned_text
def is_function_tool_call(
tool_call: OpenAIChatCompletionToolCall,
tools: list[OpenAIResponseInputTool],