diff --git a/src/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py b/src/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py index bba5e60af..95767864a 100644 --- a/src/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py +++ b/src/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py @@ -128,7 +128,7 @@ class StreamingResponseOrchestrator: self.prompt = prompt self.sequence_number = 0 # Store MCP tool mapping that gets built during tool processing - self.mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] = ctx.tool_context.previous_tools or {} + self.mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] = ctx.tool_context.previous_tools if ctx.tool_context else {} # Track final messages after all tool executions self.final_messages: list[OpenAIMessageParam] = [] # mapping for annotations @@ -229,7 +229,7 @@ class StreamingResponseOrchestrator: params = OpenAIChatCompletionRequestWithExtraBody( model=self.ctx.model, messages=messages, - tools=self.ctx.chat_tools, + tools=self.ctx.chat_tools, # type: ignore[arg-type] # ChatCompletionFunctionToolParam compatible with expected dict type stream=True, temperature=self.ctx.temperature, response_format=response_format, @@ -272,7 +272,7 @@ class StreamingResponseOrchestrator: # Handle choices with no tool calls for choice in current_response.choices: - if not (choice.message.tool_calls and self.ctx.response_tools): + if not (isinstance(choice.message, OpenAIAssistantMessageParam) and choice.message.tool_calls and self.ctx.response_tools): output_messages.append( await convert_chat_choice_to_response_message( choice, @@ -722,10 +722,12 @@ class StreamingResponseOrchestrator: ) # Accumulate arguments for final response (only for subsequent chunks) - if not is_new_tool_call: - response_tool_call.function.arguments = ( - response_tool_call.function.arguments or "" - ) + tool_call.function.arguments + if not is_new_tool_call and response_tool_call is not None: + # Need to check function is not None + if response_tool_call.function and tool_call.function: + response_tool_call.function.arguments = ( + response_tool_call.function.arguments or "" + ) + tool_call.function.arguments # Output Safety Validation for this chunk if self.guardrail_ids: @@ -747,10 +749,13 @@ class StreamingResponseOrchestrator: for tool_call_index in sorted(chat_response_tool_calls.keys()): tool_call = chat_response_tool_calls[tool_call_index] # Ensure that arguments, if sent back to the inference provider, are not None - tool_call.function.arguments = tool_call.function.arguments or "{}" + if tool_call.function: + tool_call.function.arguments = tool_call.function.arguments or "{}" tool_call_item_id = tool_call_item_ids[tool_call_index] - final_arguments = tool_call.function.arguments - tool_call_name = chat_response_tool_calls[tool_call_index].function.name + final_arguments: str = tool_call.function.arguments or "{}" if tool_call.function else "{}" + func = chat_response_tool_calls[tool_call_index].function + + tool_call_name = func.name if func else "" # Check if this is an MCP tool call is_mcp_tool = tool_call_name and tool_call_name in self.mcp_tool_to_server @@ -894,12 +899,11 @@ class StreamingResponseOrchestrator: self.sequence_number += 1 if tool_call.function.name and tool_call.function.name in self.mcp_tool_to_server: - item = OpenAIResponseOutputMessageMCPCall( + item: OpenAIResponseOutput = OpenAIResponseOutputMessageMCPCall( arguments="", name=tool_call.function.name, id=matching_item_id, server_label=self.mcp_tool_to_server[tool_call.function.name].server_label, - status="in_progress", ) elif tool_call.function.name == "web_search": item = OpenAIResponseOutputMessageWebSearchToolCall( @@ -1008,7 +1012,7 @@ class StreamingResponseOrchestrator: description=tool.description, input_schema=tool.input_schema, ) - return convert_tooldef_to_openai_tool(tool_def) + return convert_tooldef_to_openai_tool(tool_def) # type: ignore[return-value] # Dict compatible with ChatCompletionFunctionToolParam # Initialize chat_tools if not already set if self.ctx.chat_tools is None: @@ -1016,7 +1020,7 @@ class StreamingResponseOrchestrator: for input_tool in tools: if input_tool.type == "function": - self.ctx.chat_tools.append(ChatCompletionToolParam(type="function", function=input_tool.model_dump())) + self.ctx.chat_tools.append(ChatCompletionToolParam(type="function", function=input_tool.model_dump())) # type: ignore[typeddict-item,arg-type] # Dict compatible with FunctionDefinition elif input_tool.type in WebSearchToolTypes: tool_name = "web_search" # Need to access tool_groups_api from tool_executor @@ -1055,8 +1059,8 @@ class StreamingResponseOrchestrator: if isinstance(mcp_tool.allowed_tools, list): always_allowed = mcp_tool.allowed_tools elif isinstance(mcp_tool.allowed_tools, AllowedToolsFilter): - always_allowed = mcp_tool.allowed_tools.always - never_allowed = mcp_tool.allowed_tools.never + always_allowed = mcp_tool.allowed_tools.allowed # type: ignore[attr-defined] + never_allowed = mcp_tool.allowed_tools.disallowed # type: ignore[attr-defined] # Call list_mcp_tools tool_defs = None @@ -1088,7 +1092,7 @@ class StreamingResponseOrchestrator: openai_tool = convert_tooldef_to_chat_tool(t) if self.ctx.chat_tools is None: self.ctx.chat_tools = [] - self.ctx.chat_tools.append(openai_tool) + self.ctx.chat_tools.append(openai_tool) # type: ignore[arg-type] # Dict compatible with ChatCompletionFunctionToolParam # Add to MCP tool mapping if t.name in self.mcp_tool_to_server: @@ -1120,13 +1124,14 @@ class StreamingResponseOrchestrator: self, output_messages: list[OpenAIResponseOutput] ) -> AsyncIterator[OpenAIResponseObjectStream]: # Handle all mcp tool lists from previous response that are still valid: - for tool in self.ctx.tool_context.previous_tool_listings: - async for evt in self._reuse_mcp_list_tools(tool, output_messages): - yield evt - # Process all remaining tools (including MCP tools) and emit streaming events - if self.ctx.tool_context.tools_to_process: - async for stream_event in self._process_new_tools(self.ctx.tool_context.tools_to_process, output_messages): - yield stream_event + if self.ctx.tool_context: + for tool in self.ctx.tool_context.previous_tool_listings: + async for evt in self._reuse_mcp_list_tools(tool, output_messages): + yield evt + # Process all remaining tools (including MCP tools) and emit streaming events + if self.ctx.tool_context.tools_to_process: + async for stream_event in self._process_new_tools(self.ctx.tool_context.tools_to_process, output_messages): + yield stream_event def _approval_required(self, tool_name: str) -> bool: if tool_name not in self.mcp_tool_to_server: @@ -1220,7 +1225,7 @@ class StreamingResponseOrchestrator: openai_tool = convert_tooldef_to_openai_tool(tool_def) if self.ctx.chat_tools is None: self.ctx.chat_tools = [] - self.ctx.chat_tools.append(openai_tool) + self.ctx.chat_tools.append(openai_tool) # type: ignore[arg-type] # Dict compatible with ChatCompletionFunctionToolParam mcp_list_message = OpenAIResponseOutputMessageMCPListTools( id=f"mcp_list_{uuid.uuid4()}",