diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index f330d2c45..9ef49fba3 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -5,7 +5,7 @@ run-name: Run the integration test suite from tests/integration in replay mode on: push: branches: [ main ] - pull_request_target: + pull_request: branches: [ main ] types: [opened, synchronize, reopened] paths: @@ -34,7 +34,7 @@ on: concurrency: # Skip concurrency for pushes to main - each commit should be tested independently - group: ${{ github.workflow }}-${{ github.ref == 'refs/heads/main' && github.run_id || github.event.pull_request.number }} + group: ${{ github.workflow }}-${{ github.ref == 'refs/heads/main' && github.run_id || github.ref }} cancel-in-progress: true jobs: diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index f4d28e407..99a44c147 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -14,9 +14,11 @@ on: - 'pyproject.toml' - 'requirements.txt' - '.github/workflows/integration-vector-io-tests.yml' # This workflow + schedule: + - cron: '0 0 * * *' # (test on python 3.13) Daily at 12 AM UTC concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ${{ github.workflow }}-${{ github.ref == 'refs/heads/main' && github.run_id || github.ref }} cancel-in-progress: true jobs: @@ -25,7 +27,7 @@ jobs: strategy: matrix: vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant"] - python-version: ["3.12", "3.13"] + python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} fail-fast: false # we want to run all tests regardless of failure steps: diff --git a/.github/workflows/record-integration-tests.yml b/.github/workflows/record-integration-tests.yml index 12957db27..b31709a4f 100644 --- a/.github/workflows/record-integration-tests.yml +++ b/.github/workflows/record-integration-tests.yml @@ -3,7 +3,7 @@ name: Integration Tests (Record) run-name: Run the integration test suite from tests/integration on: - pull_request: + pull_request_target: branches: [ main ] types: [opened, synchronize, labeled] paths: @@ -23,7 +23,7 @@ on: default: 'ollama' concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ${{ github.workflow }}-${{ github.event.pull_request.number }} cancel-in-progress: true jobs: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 30843173c..4309f289a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,6 +2,7 @@ exclude: 'build/' default_language_version: python: python3.12 + node: "22" repos: - repo: https://github.com/pre-commit/pre-commit-hooks @@ -145,6 +146,20 @@ repos: pass_filenames: false require_serial: true files: ^.github/workflows/.*$ + - id: ui-prettier + name: Format UI code with Prettier + entry: bash -c 'cd llama_stack/ui && npm run format' + language: system + files: ^llama_stack/ui/.*\.(ts|tsx)$ + pass_filenames: false + require_serial: true + - id: ui-eslint + name: Lint UI code with ESLint + entry: bash -c 'cd llama_stack/ui && npm run lint -- --fix --quiet' + language: system + files: ^llama_stack/ui/.*\.(ts|tsx)$ + pass_filenames: false + require_serial: true ci: autofix_commit_msg: 🎨 [pre-commit.ci] Auto format from pre-commit.com hooks diff --git a/README.md b/README.md index 8db4580a2..4df4a5372 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # Llama Stack -meta-llama%2Fllama-stack | Trendshift - ------ [![PyPI version](https://img.shields.io/pypi/v/llama_stack.svg)](https://pypi.org/project/llama_stack/) [![PyPI - Downloads](https://img.shields.io/pypi/dm/llama-stack)](https://pypi.org/project/llama-stack/) [![License](https://img.shields.io/pypi/l/llama_stack.svg)](https://github.com/meta-llama/llama-stack/blob/main/LICENSE) diff --git a/docs/source/distributions/k8s-benchmark/openai-mock-server.py b/docs/source/distributions/k8s-benchmark/openai-mock-server.py old mode 100644 new mode 100755 diff --git a/llama_stack/providers/inline/agents/meta_reference/agents.py b/llama_stack/providers/inline/agents/meta_reference/agents.py index 0f12a0865..30196c429 100644 --- a/llama_stack/providers/inline/agents/meta_reference/agents.py +++ b/llama_stack/providers/inline/agents/meta_reference/agents.py @@ -48,8 +48,8 @@ from llama_stack.providers.utils.responses.responses_store import ResponsesStore from .agent_instance import ChatAgent from .config import MetaReferenceAgentsImplConfig -from .openai_responses import OpenAIResponsesImpl from .persistence import AgentInfo +from .responses.openai_responses import OpenAIResponsesImpl logger = logging.getLogger() diff --git a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py deleted file mode 100644 index 6aca4d68e..000000000 --- a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py +++ /dev/null @@ -1,1154 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import asyncio -import json -import time -import uuid -from collections.abc import AsyncIterator -from typing import Any - -from openai.types.chat import ChatCompletionToolParam -from pydantic import BaseModel - -from llama_stack.apis.agents import Order -from llama_stack.apis.agents.openai_responses import ( - AllowedToolsFilter, - ListOpenAIResponseInputItem, - ListOpenAIResponseObject, - OpenAIDeleteResponseObject, - OpenAIResponseContentPartOutputText, - OpenAIResponseInput, - OpenAIResponseInputFunctionToolCallOutput, - OpenAIResponseInputMessageContent, - OpenAIResponseInputMessageContentImage, - OpenAIResponseInputMessageContentText, - OpenAIResponseInputTool, - OpenAIResponseInputToolFileSearch, - OpenAIResponseInputToolMCP, - OpenAIResponseMessage, - OpenAIResponseObject, - OpenAIResponseObjectStream, - OpenAIResponseObjectStreamResponseCompleted, - OpenAIResponseObjectStreamResponseContentPartAdded, - OpenAIResponseObjectStreamResponseContentPartDone, - OpenAIResponseObjectStreamResponseCreated, - OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta, - OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone, - OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta, - OpenAIResponseObjectStreamResponseMcpCallArgumentsDone, - OpenAIResponseObjectStreamResponseMcpCallCompleted, - OpenAIResponseObjectStreamResponseMcpCallFailed, - OpenAIResponseObjectStreamResponseMcpCallInProgress, - OpenAIResponseObjectStreamResponseOutputItemAdded, - OpenAIResponseObjectStreamResponseOutputItemDone, - OpenAIResponseObjectStreamResponseOutputTextDelta, - OpenAIResponseObjectStreamResponseWebSearchCallCompleted, - OpenAIResponseObjectStreamResponseWebSearchCallInProgress, - OpenAIResponseObjectStreamResponseWebSearchCallSearching, - OpenAIResponseOutput, - OpenAIResponseOutputMessageContent, - OpenAIResponseOutputMessageContentOutputText, - OpenAIResponseOutputMessageFileSearchToolCall, - OpenAIResponseOutputMessageFileSearchToolCallResults, - OpenAIResponseOutputMessageFunctionToolCall, - OpenAIResponseOutputMessageMCPListTools, - OpenAIResponseOutputMessageWebSearchToolCall, - OpenAIResponseText, - OpenAIResponseTextFormat, - WebSearchToolTypes, -) -from llama_stack.apis.common.content_types import TextContentItem -from llama_stack.apis.inference import ( - Inference, - OpenAIAssistantMessageParam, - OpenAIChatCompletion, - OpenAIChatCompletionContentPartImageParam, - OpenAIChatCompletionContentPartParam, - OpenAIChatCompletionContentPartTextParam, - OpenAIChatCompletionToolCall, - OpenAIChatCompletionToolCallFunction, - OpenAIChoice, - OpenAIDeveloperMessageParam, - OpenAIImageURL, - OpenAIJSONSchema, - OpenAIMessageParam, - OpenAIResponseFormatJSONObject, - OpenAIResponseFormatJSONSchema, - OpenAIResponseFormatParam, - OpenAIResponseFormatText, - OpenAISystemMessageParam, - OpenAIToolMessageParam, - OpenAIUserMessageParam, -) -from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime -from llama_stack.apis.vector_io import VectorIO -from llama_stack.log import get_logger -from llama_stack.models.llama.datatypes import ToolDefinition, ToolParamDefinition -from llama_stack.providers.utils.inference.openai_compat import ( - convert_tooldef_to_openai_tool, -) -from llama_stack.providers.utils.responses.responses_store import ResponsesStore - -logger = get_logger(name=__name__, category="openai_responses") - -OPENAI_RESPONSES_PREFIX = "openai_responses:" - - -class ToolExecutionResult(BaseModel): - """Result of streaming tool execution.""" - - stream_event: OpenAIResponseObjectStream | None = None - sequence_number: int - final_output_message: OpenAIResponseOutput | None = None - final_input_message: OpenAIMessageParam | None = None - - -async def _convert_response_content_to_chat_content( - content: (str | list[OpenAIResponseInputMessageContent] | list[OpenAIResponseOutputMessageContent]), -) -> str | list[OpenAIChatCompletionContentPartParam]: - """ - Convert the content parts from an OpenAI Response API request into OpenAI Chat Completion content parts. - - The content schemas of each API look similar, but are not exactly the same. - """ - if isinstance(content, str): - return content - - converted_parts = [] - for content_part in content: - if isinstance(content_part, OpenAIResponseInputMessageContentText): - converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part.text)) - elif isinstance(content_part, OpenAIResponseOutputMessageContentOutputText): - converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part.text)) - elif isinstance(content_part, OpenAIResponseInputMessageContentImage): - if content_part.image_url: - image_url = OpenAIImageURL(url=content_part.image_url, detail=content_part.detail) - converted_parts.append(OpenAIChatCompletionContentPartImageParam(image_url=image_url)) - elif isinstance(content_part, str): - converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part)) - else: - raise ValueError( - f"Llama Stack OpenAI Responses does not yet support content type '{type(content_part)}' in this context" - ) - return converted_parts - - -async def _convert_response_input_to_chat_messages( - input: str | list[OpenAIResponseInput], -) -> list[OpenAIMessageParam]: - """ - Convert the input from an OpenAI Response API request into OpenAI Chat Completion messages. - """ - messages: list[OpenAIMessageParam] = [] - if isinstance(input, list): - for input_item in input: - if isinstance(input_item, OpenAIResponseInputFunctionToolCallOutput): - messages.append( - OpenAIToolMessageParam( - content=input_item.output, - tool_call_id=input_item.call_id, - ) - ) - elif isinstance(input_item, OpenAIResponseOutputMessageFunctionToolCall): - tool_call = OpenAIChatCompletionToolCall( - index=0, - id=input_item.call_id, - function=OpenAIChatCompletionToolCallFunction( - name=input_item.name, - arguments=input_item.arguments, - ), - ) - messages.append(OpenAIAssistantMessageParam(tool_calls=[tool_call])) - else: - content = await _convert_response_content_to_chat_content(input_item.content) - message_type = await _get_message_type_by_role(input_item.role) - if message_type is None: - raise ValueError( - f"Llama Stack OpenAI Responses does not yet support message role '{input_item.role}' in this context" - ) - messages.append(message_type(content=content)) - else: - messages.append(OpenAIUserMessageParam(content=input)) - return messages - - -async def _convert_chat_choice_to_response_message( - choice: OpenAIChoice, -) -> OpenAIResponseMessage: - """ - Convert an OpenAI Chat Completion choice into an OpenAI Response output message. - """ - output_content = "" - if isinstance(choice.message.content, str): - output_content = choice.message.content - elif isinstance(choice.message.content, OpenAIChatCompletionContentPartTextParam): - output_content = choice.message.content.text - else: - raise ValueError( - f"Llama Stack OpenAI Responses does not yet support output content type: {type(choice.message.content)}" - ) - - return OpenAIResponseMessage( - id=f"msg_{uuid.uuid4()}", - content=[OpenAIResponseOutputMessageContentOutputText(text=output_content)], - status="completed", - role="assistant", - ) - - -async def _convert_response_text_to_chat_response_format( - text: OpenAIResponseText, -) -> OpenAIResponseFormatParam: - """ - Convert an OpenAI Response text parameter into an OpenAI Chat Completion response format. - """ - if not text.format or text.format["type"] == "text": - return OpenAIResponseFormatText(type="text") - if text.format["type"] == "json_object": - return OpenAIResponseFormatJSONObject() - if text.format["type"] == "json_schema": - return OpenAIResponseFormatJSONSchema( - json_schema=OpenAIJSONSchema(name=text.format["name"], schema=text.format["schema"]) - ) - raise ValueError(f"Unsupported text format: {text.format}") - - -async def _get_message_type_by_role(role: str): - role_to_type = { - "user": OpenAIUserMessageParam, - "system": OpenAISystemMessageParam, - "assistant": OpenAIAssistantMessageParam, - "developer": OpenAIDeveloperMessageParam, - } - return role_to_type.get(role) - - -class OpenAIResponsePreviousResponseWithInputItems(BaseModel): - input_items: ListOpenAIResponseInputItem - response: OpenAIResponseObject - - -class ChatCompletionContext(BaseModel): - model: str - messages: list[OpenAIMessageParam] - response_tools: list[OpenAIResponseInputTool] | None = None - chat_tools: list[ChatCompletionToolParam] | None = None - mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] - temperature: float | None - response_format: OpenAIResponseFormatParam - - -class OpenAIResponsesImpl: - def __init__( - self, - inference_api: Inference, - tool_groups_api: ToolGroups, - tool_runtime_api: ToolRuntime, - responses_store: ResponsesStore, - vector_io_api: VectorIO, # VectorIO - ): - self.inference_api = inference_api - self.tool_groups_api = tool_groups_api - self.tool_runtime_api = tool_runtime_api - self.responses_store = responses_store - self.vector_io_api = vector_io_api - - async def _prepend_previous_response( - self, - input: str | list[OpenAIResponseInput], - previous_response_id: str | None = None, - ): - if previous_response_id: - previous_response_with_input = await self.responses_store.get_response_object(previous_response_id) - - # previous response input items - new_input_items = previous_response_with_input.input - - # previous response output items - new_input_items.extend(previous_response_with_input.output) - - # new input items from the current request - if isinstance(input, str): - new_input_items.append(OpenAIResponseMessage(content=input, role="user")) - else: - new_input_items.extend(input) - - input = new_input_items - - return input - - async def _prepend_instructions(self, messages, instructions): - if instructions: - messages.insert(0, OpenAISystemMessageParam(content=instructions)) - - async def get_openai_response( - self, - response_id: str, - ) -> OpenAIResponseObject: - response_with_input = await self.responses_store.get_response_object(response_id) - return OpenAIResponseObject(**{k: v for k, v in response_with_input.model_dump().items() if k != "input"}) - - async def list_openai_responses( - self, - after: str | None = None, - limit: int | None = 50, - model: str | None = None, - order: Order | None = Order.desc, - ) -> ListOpenAIResponseObject: - return await self.responses_store.list_responses(after, limit, model, order) - - async def list_openai_response_input_items( - self, - response_id: str, - after: str | None = None, - before: str | None = None, - include: list[str] | None = None, - limit: int | None = 20, - order: Order | None = Order.desc, - ) -> ListOpenAIResponseInputItem: - """List input items for a given OpenAI response. - - :param response_id: The ID of the response to retrieve input items for. - :param after: An item ID to list items after, used for pagination. - :param before: An item ID to list items before, used for pagination. - :param include: Additional fields to include in the response. - :param limit: A limit on the number of objects to be returned. - :param order: The order to return the input items in. - :returns: An ListOpenAIResponseInputItem. - """ - return await self.responses_store.list_response_input_items(response_id, after, before, include, limit, order) - - async def _store_response( - self, - response: OpenAIResponseObject, - input: str | list[OpenAIResponseInput], - ) -> None: - new_input_id = f"msg_{uuid.uuid4()}" - if isinstance(input, str): - # synthesize a message from the input string - input_content = OpenAIResponseInputMessageContentText(text=input) - input_content_item = OpenAIResponseMessage( - role="user", - content=[input_content], - id=new_input_id, - ) - input_items_data = [input_content_item] - else: - # we already have a list of messages - input_items_data = [] - for input_item in input: - if isinstance(input_item, OpenAIResponseMessage): - # These may or may not already have an id, so dump to dict, check for id, and add if missing - input_item_dict = input_item.model_dump() - if "id" not in input_item_dict: - input_item_dict["id"] = new_input_id - input_items_data.append(OpenAIResponseMessage(**input_item_dict)) - else: - input_items_data.append(input_item) - - await self.responses_store.store_response_object( - response_object=response, - input=input_items_data, - ) - - async def create_openai_response( - self, - input: str | list[OpenAIResponseInput], - model: str, - instructions: str | None = None, - previous_response_id: str | None = None, - store: bool | None = True, - stream: bool | None = False, - temperature: float | None = None, - text: OpenAIResponseText | None = None, - tools: list[OpenAIResponseInputTool] | None = None, - include: list[str] | None = None, - max_infer_iters: int | None = 10, - ): - stream = bool(stream) - text = OpenAIResponseText(format=OpenAIResponseTextFormat(type="text")) if text is None else text - - stream_gen = self._create_streaming_response( - input=input, - model=model, - instructions=instructions, - previous_response_id=previous_response_id, - store=store, - temperature=temperature, - text=text, - tools=tools, - max_infer_iters=max_infer_iters, - ) - - if stream: - return stream_gen - else: - response = None - async for stream_chunk in stream_gen: - if stream_chunk.type == "response.completed": - if response is not None: - raise ValueError("The response stream completed multiple times! Earlier response: {response}") - response = stream_chunk.response - # don't leave the generator half complete! - - if response is None: - raise ValueError("The response stream never completed") - return response - - async def _create_streaming_response( - self, - input: str | list[OpenAIResponseInput], - model: str, - instructions: str | None = None, - previous_response_id: str | None = None, - store: bool | None = True, - temperature: float | None = None, - text: OpenAIResponseText | None = None, - tools: list[OpenAIResponseInputTool] | None = None, - max_infer_iters: int | None = 10, - ) -> AsyncIterator[OpenAIResponseObjectStream]: - output_messages: list[OpenAIResponseOutput] = [] - - # Input preprocessing - input = await self._prepend_previous_response(input, previous_response_id) - messages = await _convert_response_input_to_chat_messages(input) - await self._prepend_instructions(messages, instructions) - - # Structured outputs - response_format = await _convert_response_text_to_chat_response_format(text) - - # Tool setup, TODO: refactor this slightly since this can also yield events - chat_tools, mcp_tool_to_server, mcp_list_message = ( - await self._convert_response_tools_to_chat_tools(tools) if tools else (None, {}, None) - ) - if mcp_list_message: - output_messages.append(mcp_list_message) - - ctx = ChatCompletionContext( - model=model, - messages=messages, - response_tools=tools, - chat_tools=chat_tools, - mcp_tool_to_server=mcp_tool_to_server, - temperature=temperature, - response_format=response_format, - ) - - # Create initial response and emit response.created immediately - response_id = f"resp-{uuid.uuid4()}" - created_at = int(time.time()) - - initial_response = OpenAIResponseObject( - created_at=created_at, - id=response_id, - model=model, - object="response", - status="in_progress", - output=output_messages.copy(), - text=text, - ) - - yield OpenAIResponseObjectStreamResponseCreated(response=initial_response) - - n_iter = 0 - messages = ctx.messages.copy() - - while True: - completion_result = await self.inference_api.openai_chat_completion( - model=ctx.model, - messages=messages, - tools=ctx.chat_tools, - stream=True, - temperature=ctx.temperature, - response_format=ctx.response_format, - ) - - # Process streaming chunks and build complete response - chat_response_id = "" - chat_response_content = [] - chat_response_tool_calls: dict[int, OpenAIChatCompletionToolCall] = {} - chunk_created = 0 - chunk_model = "" - chunk_finish_reason = "" - sequence_number = 0 - - # Create a placeholder message item for delta events - message_item_id = f"msg_{uuid.uuid4()}" - # Track tool call items for streaming events - tool_call_item_ids: dict[int, str] = {} - # Track content parts for streaming events - content_part_emitted = False - - async for chunk in completion_result: - chat_response_id = chunk.id - chunk_created = chunk.created - chunk_model = chunk.model - for chunk_choice in chunk.choices: - # Emit incremental text content as delta events - if chunk_choice.delta.content: - # Emit content_part.added event for first text chunk - if not content_part_emitted: - content_part_emitted = True - sequence_number += 1 - yield OpenAIResponseObjectStreamResponseContentPartAdded( - response_id=response_id, - item_id=message_item_id, - part=OpenAIResponseContentPartOutputText( - text="", # Will be filled incrementally via text deltas - ), - sequence_number=sequence_number, - ) - sequence_number += 1 - yield OpenAIResponseObjectStreamResponseOutputTextDelta( - content_index=0, - delta=chunk_choice.delta.content, - item_id=message_item_id, - output_index=0, - sequence_number=sequence_number, - ) - - # Collect content for final response - chat_response_content.append(chunk_choice.delta.content or "") - if chunk_choice.finish_reason: - chunk_finish_reason = chunk_choice.finish_reason - - # Aggregate tool call arguments across chunks - if chunk_choice.delta.tool_calls: - for tool_call in chunk_choice.delta.tool_calls: - response_tool_call = chat_response_tool_calls.get(tool_call.index, None) - # Create new tool call entry if this is the first chunk for this index - is_new_tool_call = response_tool_call is None - if is_new_tool_call: - tool_call_dict: dict[str, Any] = tool_call.model_dump() - tool_call_dict.pop("type", None) - response_tool_call = OpenAIChatCompletionToolCall(**tool_call_dict) - chat_response_tool_calls[tool_call.index] = response_tool_call - - # Create item ID for this tool call for streaming events - tool_call_item_id = f"fc_{uuid.uuid4()}" - tool_call_item_ids[tool_call.index] = tool_call_item_id - - # Emit output_item.added event for the new function call - sequence_number += 1 - function_call_item = OpenAIResponseOutputMessageFunctionToolCall( - arguments="", # Will be filled incrementally via delta events - call_id=tool_call.id or "", - name=tool_call.function.name if tool_call.function else "", - id=tool_call_item_id, - status="in_progress", - ) - yield OpenAIResponseObjectStreamResponseOutputItemAdded( - response_id=response_id, - item=function_call_item, - output_index=len(output_messages), - sequence_number=sequence_number, - ) - - # Stream tool call arguments as they arrive (differentiate between MCP and function calls) - if tool_call.function and tool_call.function.arguments: - tool_call_item_id = tool_call_item_ids[tool_call.index] - sequence_number += 1 - - # Check if this is an MCP tool call - is_mcp_tool = ( - ctx.mcp_tool_to_server - and tool_call.function.name - and tool_call.function.name in ctx.mcp_tool_to_server - ) - if is_mcp_tool: - # Emit MCP-specific argument delta event - yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta( - delta=tool_call.function.arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) - else: - # Emit function call argument delta event - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( - delta=tool_call.function.arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) - - # Accumulate arguments for final response (only for subsequent chunks) - if not is_new_tool_call: - response_tool_call.function.arguments = ( - response_tool_call.function.arguments or "" - ) + tool_call.function.arguments - - # Emit arguments.done events for completed tool calls (differentiate between MCP and function calls) - for tool_call_index in sorted(chat_response_tool_calls.keys()): - tool_call_item_id = tool_call_item_ids[tool_call_index] - final_arguments = chat_response_tool_calls[tool_call_index].function.arguments or "" - tool_call_name = chat_response_tool_calls[tool_call_index].function.name - - # Check if this is an MCP tool call - is_mcp_tool = ctx.mcp_tool_to_server and tool_call_name and tool_call_name in ctx.mcp_tool_to_server - sequence_number += 1 - if is_mcp_tool: - # Emit MCP-specific argument done event - yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDone( - arguments=final_arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) - else: - # Emit function call argument done event - yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone( - arguments=final_arguments, - item_id=tool_call_item_id, - output_index=len(output_messages), - sequence_number=sequence_number, - ) - - # Convert collected chunks to complete response - if chat_response_tool_calls: - tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())] - else: - tool_calls = None - - # Emit content_part.done event if text content was streamed (before content gets cleared) - if content_part_emitted: - final_text = "".join(chat_response_content) - sequence_number += 1 - yield OpenAIResponseObjectStreamResponseContentPartDone( - response_id=response_id, - item_id=message_item_id, - part=OpenAIResponseContentPartOutputText( - text=final_text, - ), - sequence_number=sequence_number, - ) - - # Clear content when there are tool calls (OpenAI spec behavior) - if chat_response_tool_calls: - chat_response_content = [] - - assistant_message = OpenAIAssistantMessageParam( - content="".join(chat_response_content), - tool_calls=tool_calls, - ) - current_response = OpenAIChatCompletion( - id=chat_response_id, - choices=[ - OpenAIChoice( - message=assistant_message, - finish_reason=chunk_finish_reason, - index=0, - ) - ], - created=chunk_created, - model=chunk_model, - ) - - function_tool_calls = [] - non_function_tool_calls = [] - - next_turn_messages = messages.copy() - for choice in current_response.choices: - next_turn_messages.append(choice.message) - - if choice.message.tool_calls and tools: - for tool_call in choice.message.tool_calls: - if _is_function_tool_call(tool_call, tools): - function_tool_calls.append(tool_call) - else: - non_function_tool_calls.append(tool_call) - else: - output_messages.append(await _convert_chat_choice_to_response_message(choice)) - - # execute non-function tool calls - for tool_call in non_function_tool_calls: - # Find the item_id for this tool call - matching_item_id = None - for index, item_id in tool_call_item_ids.items(): - response_tool_call = chat_response_tool_calls.get(index) - if response_tool_call and response_tool_call.id == tool_call.id: - matching_item_id = item_id - break - - # Use a fallback item_id if not found - if not matching_item_id: - matching_item_id = f"tc_{uuid.uuid4()}" - - # Execute tool call with streaming - tool_call_log = None - tool_response_message = None - async for result in self._execute_tool_call( - tool_call, ctx, sequence_number, response_id, len(output_messages), matching_item_id - ): - if result.stream_event: - # Forward streaming events - sequence_number = result.sequence_number - yield result.stream_event - - if result.final_output_message is not None: - tool_call_log = result.final_output_message - tool_response_message = result.final_input_message - sequence_number = result.sequence_number - - if tool_call_log: - output_messages.append(tool_call_log) - - # Emit output_item.done event for completed non-function tool call - if matching_item_id: - sequence_number += 1 - yield OpenAIResponseObjectStreamResponseOutputItemDone( - response_id=response_id, - item=tool_call_log, - output_index=len(output_messages) - 1, - sequence_number=sequence_number, - ) - - if tool_response_message: - next_turn_messages.append(tool_response_message) - - for tool_call in function_tool_calls: - # Find the item_id for this tool call from our tracking dictionary - matching_item_id = None - for index, item_id in tool_call_item_ids.items(): - response_tool_call = chat_response_tool_calls.get(index) - if response_tool_call and response_tool_call.id == tool_call.id: - matching_item_id = item_id - break - - # Use existing item_id or create new one if not found - final_item_id = matching_item_id or f"fc_{uuid.uuid4()}" - - function_call_item = OpenAIResponseOutputMessageFunctionToolCall( - arguments=tool_call.function.arguments or "", - call_id=tool_call.id, - name=tool_call.function.name or "", - id=final_item_id, - status="completed", - ) - output_messages.append(function_call_item) - - # Emit output_item.done event for completed function call - sequence_number += 1 - yield OpenAIResponseObjectStreamResponseOutputItemDone( - response_id=response_id, - item=function_call_item, - output_index=len(output_messages) - 1, - sequence_number=sequence_number, - ) - - if not function_tool_calls and not non_function_tool_calls: - break - - if function_tool_calls: - logger.info("Exiting inference loop since there is a function (client-side) tool call") - break - - n_iter += 1 - if n_iter >= max_infer_iters: - logger.info(f"Exiting inference loop since iteration count({n_iter}) exceeds {max_infer_iters=}") - break - - messages = next_turn_messages - - # Create final response - final_response = OpenAIResponseObject( - created_at=created_at, - id=response_id, - model=model, - object="response", - status="completed", - text=text, - output=output_messages, - ) - - # Emit response.completed - yield OpenAIResponseObjectStreamResponseCompleted(response=final_response) - - if store: - await self._store_response( - response=final_response, - input=input, - ) - - async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject: - return await self.responses_store.delete_response_object(response_id) - - async def _convert_response_tools_to_chat_tools( - self, tools: list[OpenAIResponseInputTool] - ) -> tuple[ - list[ChatCompletionToolParam], - dict[str, OpenAIResponseInputToolMCP], - OpenAIResponseOutput | None, - ]: - from llama_stack.apis.agents.openai_responses import ( - MCPListToolsTool, - ) - from llama_stack.apis.tools import Tool - - mcp_tool_to_server = {} - - def make_openai_tool(tool_name: str, tool: Tool) -> ChatCompletionToolParam: - tool_def = ToolDefinition( - tool_name=tool_name, - description=tool.description, - parameters={ - param.name: ToolParamDefinition( - param_type=param.parameter_type, - description=param.description, - required=param.required, - default=param.default, - ) - for param in tool.parameters - }, - ) - return convert_tooldef_to_openai_tool(tool_def) - - mcp_list_message = None - chat_tools: list[ChatCompletionToolParam] = [] - for input_tool in tools: - # TODO: Handle other tool types - if input_tool.type == "function": - chat_tools.append(ChatCompletionToolParam(type="function", function=input_tool.model_dump())) - elif input_tool.type in WebSearchToolTypes: - tool_name = "web_search" - tool = await self.tool_groups_api.get_tool(tool_name) - if not tool: - raise ValueError(f"Tool {tool_name} not found") - chat_tools.append(make_openai_tool(tool_name, tool)) - elif input_tool.type == "file_search": - tool_name = "knowledge_search" - tool = await self.tool_groups_api.get_tool(tool_name) - if not tool: - raise ValueError(f"Tool {tool_name} not found") - chat_tools.append(make_openai_tool(tool_name, tool)) - elif input_tool.type == "mcp": - from llama_stack.providers.utils.tools.mcp import list_mcp_tools - - always_allowed = None - never_allowed = None - if input_tool.allowed_tools: - if isinstance(input_tool.allowed_tools, list): - always_allowed = input_tool.allowed_tools - elif isinstance(input_tool.allowed_tools, AllowedToolsFilter): - always_allowed = input_tool.allowed_tools.always - never_allowed = input_tool.allowed_tools.never - - tool_defs = await list_mcp_tools( - endpoint=input_tool.server_url, - headers=input_tool.headers or {}, - ) - - mcp_list_message = OpenAIResponseOutputMessageMCPListTools( - id=f"mcp_list_{uuid.uuid4()}", - status="completed", - server_label=input_tool.server_label, - tools=[], - ) - for t in tool_defs.data: - if never_allowed and t.name in never_allowed: - continue - if not always_allowed or t.name in always_allowed: - chat_tools.append(make_openai_tool(t.name, t)) - if t.name in mcp_tool_to_server: - raise ValueError(f"Duplicate tool name {t.name} found for server {input_tool.server_label}") - mcp_tool_to_server[t.name] = input_tool - mcp_list_message.tools.append( - MCPListToolsTool( - name=t.name, - description=t.description, - input_schema={ - "type": "object", - "properties": { - p.name: { - "type": p.parameter_type, - "description": p.description, - } - for p in t.parameters - }, - "required": [p.name for p in t.parameters if p.required], - }, - ) - ) - else: - raise ValueError(f"Llama Stack OpenAI Responses does not yet support tool type: {input_tool.type}") - return chat_tools, mcp_tool_to_server, mcp_list_message - - async def _execute_knowledge_search_via_vector_store( - self, - query: str, - response_file_search_tool: OpenAIResponseInputToolFileSearch, - ) -> ToolInvocationResult: - """Execute knowledge search using vector_stores.search API with filters support.""" - search_results = [] - - # Create search tasks for all vector stores - async def search_single_store(vector_store_id): - try: - search_response = await self.vector_io_api.openai_search_vector_store( - vector_store_id=vector_store_id, - query=query, - filters=response_file_search_tool.filters, - max_num_results=response_file_search_tool.max_num_results, - ranking_options=response_file_search_tool.ranking_options, - rewrite_query=False, - ) - return search_response.data - except Exception as e: - logger.warning(f"Failed to search vector store {vector_store_id}: {e}") - return [] - - # Run all searches in parallel using gather - search_tasks = [search_single_store(vid) for vid in response_file_search_tool.vector_store_ids] - all_results = await asyncio.gather(*search_tasks) - - # Flatten results - for results in all_results: - search_results.extend(results) - - # Convert search results to tool result format matching memory.py - # Format the results as interleaved content similar to memory.py - content_items = [] - content_items.append( - TextContentItem( - text=f"knowledge_search tool found {len(search_results)} chunks:\nBEGIN of knowledge_search tool results.\n" - ) - ) - - for i, result_item in enumerate(search_results): - chunk_text = result_item.content[0].text if result_item.content else "" - metadata_text = f"document_id: {result_item.file_id}, score: {result_item.score}" - if result_item.attributes: - metadata_text += f", attributes: {result_item.attributes}" - text_content = f"[{i + 1}] {metadata_text}\n{chunk_text}\n" - content_items.append(TextContentItem(text=text_content)) - - content_items.append(TextContentItem(text="END of knowledge_search tool results.\n")) - content_items.append( - TextContentItem( - text=f'The above results were retrieved to help answer the user\'s query: "{query}". Use them as supporting information only in answering this query.\n', - ) - ) - - return ToolInvocationResult( - content=content_items, - metadata={ - "document_ids": [r.file_id for r in search_results], - "chunks": [r.content[0].text if r.content else "" for r in search_results], - "scores": [r.score for r in search_results], - }, - ) - - async def _execute_tool_call( - self, - tool_call: OpenAIChatCompletionToolCall, - ctx: ChatCompletionContext, - sequence_number: int, - response_id: str, - output_index: int, - item_id: str, - ) -> AsyncIterator[ToolExecutionResult]: - from llama_stack.providers.utils.inference.prompt_adapter import ( - interleaved_content_as_str, - ) - - tool_call_id = tool_call.id - function = tool_call.function - tool_kwargs = json.loads(function.arguments) if function.arguments else {} - - if not function or not tool_call_id or not function.name: - yield ToolExecutionResult(sequence_number=sequence_number) - return - - # Emit in_progress event based on tool type (only for tools with specific streaming events) - progress_event = None - if ctx.mcp_tool_to_server and function.name in ctx.mcp_tool_to_server: - sequence_number += 1 - progress_event = OpenAIResponseObjectStreamResponseMcpCallInProgress( - item_id=item_id, - output_index=output_index, - sequence_number=sequence_number, - ) - elif function.name == "web_search": - sequence_number += 1 - progress_event = OpenAIResponseObjectStreamResponseWebSearchCallInProgress( - item_id=item_id, - output_index=output_index, - sequence_number=sequence_number, - ) - # Note: knowledge_search and other custom tools don't have specific streaming events in OpenAI spec - - if progress_event: - yield ToolExecutionResult(stream_event=progress_event, sequence_number=sequence_number) - - # For web search, emit searching event - if function.name == "web_search": - sequence_number += 1 - searching_event = OpenAIResponseObjectStreamResponseWebSearchCallSearching( - item_id=item_id, - output_index=output_index, - sequence_number=sequence_number, - ) - yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number) - - # Execute the actual tool call - error_exc = None - result = None - try: - if ctx.mcp_tool_to_server and function.name in ctx.mcp_tool_to_server: - from llama_stack.providers.utils.tools.mcp import invoke_mcp_tool - - mcp_tool = ctx.mcp_tool_to_server[function.name] - result = await invoke_mcp_tool( - endpoint=mcp_tool.server_url, - headers=mcp_tool.headers or {}, - tool_name=function.name, - kwargs=tool_kwargs, - ) - elif function.name == "knowledge_search": - response_file_search_tool = next( - (t for t in ctx.response_tools if isinstance(t, OpenAIResponseInputToolFileSearch)), - None, - ) - if response_file_search_tool: - # Use vector_stores.search API instead of knowledge_search tool - # to support filters and ranking_options - query = tool_kwargs.get("query", "") - result = await self._execute_knowledge_search_via_vector_store( - query=query, - response_file_search_tool=response_file_search_tool, - ) - else: - result = await self.tool_runtime_api.invoke_tool( - tool_name=function.name, - kwargs=tool_kwargs, - ) - except Exception as e: - error_exc = e - - # Emit completion or failure event based on result (only for tools with specific streaming events) - has_error = error_exc or (result and ((result.error_code and result.error_code > 0) or result.error_message)) - completion_event = None - - if ctx.mcp_tool_to_server and function.name in ctx.mcp_tool_to_server: - sequence_number += 1 - if has_error: - completion_event = OpenAIResponseObjectStreamResponseMcpCallFailed( - sequence_number=sequence_number, - ) - else: - completion_event = OpenAIResponseObjectStreamResponseMcpCallCompleted( - sequence_number=sequence_number, - ) - elif function.name == "web_search": - sequence_number += 1 - completion_event = OpenAIResponseObjectStreamResponseWebSearchCallCompleted( - item_id=item_id, - output_index=output_index, - sequence_number=sequence_number, - ) - # Note: knowledge_search and other custom tools don't have specific completion events in OpenAI spec - - if completion_event: - yield ToolExecutionResult(stream_event=completion_event, sequence_number=sequence_number) - - # Build the result message and input message - if function.name in ctx.mcp_tool_to_server: - from llama_stack.apis.agents.openai_responses import ( - OpenAIResponseOutputMessageMCPCall, - ) - - message = OpenAIResponseOutputMessageMCPCall( - id=tool_call_id, - arguments=function.arguments, - name=function.name, - server_label=ctx.mcp_tool_to_server[function.name].server_label, - ) - if error_exc: - message.error = str(error_exc) - elif (result and result.error_code and result.error_code > 0) or (result and result.error_message): - message.error = f"Error (code {result.error_code}): {result.error_message}" - elif result and result.content: - message.output = interleaved_content_as_str(result.content) - else: - if function.name == "web_search": - message = OpenAIResponseOutputMessageWebSearchToolCall( - id=tool_call_id, - status="completed", - ) - if has_error: - message.status = "failed" - elif function.name == "knowledge_search": - message = OpenAIResponseOutputMessageFileSearchToolCall( - id=tool_call_id, - queries=[tool_kwargs.get("query", "")], - status="completed", - ) - if result and "document_ids" in result.metadata: - message.results = [] - for i, doc_id in enumerate(result.metadata["document_ids"]): - text = result.metadata["chunks"][i] if "chunks" in result.metadata else None - score = result.metadata["scores"][i] if "scores" in result.metadata else None - message.results.append( - OpenAIResponseOutputMessageFileSearchToolCallResults( - file_id=doc_id, - filename=doc_id, - text=text, - score=score, - attributes={}, - ) - ) - if has_error: - message.status = "failed" - else: - raise ValueError(f"Unknown tool {function.name} called") - - input_message = None - if result and result.content: - if isinstance(result.content, str): - content = result.content - elif isinstance(result.content, list): - from llama_stack.apis.common.content_types import ( - ImageContentItem, - TextContentItem, - ) - - content = [] - for item in result.content: - if isinstance(item, TextContentItem): - part = OpenAIChatCompletionContentPartTextParam(text=item.text) - elif isinstance(item, ImageContentItem): - if item.image.data: - url = f"data:image;base64,{item.image.data}" - else: - url = item.image.url - part = OpenAIChatCompletionContentPartImageParam(image_url=OpenAIImageURL(url=url)) - else: - raise ValueError(f"Unknown result content type: {type(item)}") - content.append(part) - else: - raise ValueError(f"Unknown result content type: {type(result.content)}") - input_message = OpenAIToolMessageParam(content=content, tool_call_id=tool_call_id) - else: - text = str(error_exc) if error_exc else "Tool execution failed" - input_message = OpenAIToolMessageParam(content=text, tool_call_id=tool_call_id) - - # Yield the final result - yield ToolExecutionResult( - sequence_number=sequence_number, final_output_message=message, final_input_message=input_message - ) - - -def _is_function_tool_call( - tool_call: OpenAIChatCompletionToolCall, - tools: list[OpenAIResponseInputTool], -) -> bool: - if not tool_call.function: - return False - for t in tools: - if t.type == "function" and t.name == tool_call.function.name: - return True - return False diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/__init__.py b/llama_stack/providers/inline/agents/meta_reference/responses/__init__.py new file mode 100644 index 000000000..756f351d8 --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/__init__.py @@ -0,0 +1,5 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py new file mode 100644 index 000000000..e528a4005 --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py @@ -0,0 +1,271 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import time +import uuid +from collections.abc import AsyncIterator + +from pydantic import BaseModel + +from llama_stack.apis.agents import Order +from llama_stack.apis.agents.openai_responses import ( + ListOpenAIResponseInputItem, + ListOpenAIResponseObject, + OpenAIDeleteResponseObject, + OpenAIResponseInput, + OpenAIResponseInputMessageContentText, + OpenAIResponseInputTool, + OpenAIResponseMessage, + OpenAIResponseObject, + OpenAIResponseObjectStream, + OpenAIResponseText, + OpenAIResponseTextFormat, +) +from llama_stack.apis.inference import ( + Inference, + OpenAISystemMessageParam, +) +from llama_stack.apis.tools import ToolGroups, ToolRuntime +from llama_stack.apis.vector_io import VectorIO +from llama_stack.log import get_logger +from llama_stack.providers.utils.responses.responses_store import ResponsesStore + +from .streaming import StreamingResponseOrchestrator +from .tool_executor import ToolExecutor +from .types import ChatCompletionContext +from .utils import ( + convert_response_input_to_chat_messages, + convert_response_text_to_chat_response_format, +) + +logger = get_logger(name=__name__, category="responses") + + +class OpenAIResponsePreviousResponseWithInputItems(BaseModel): + input_items: ListOpenAIResponseInputItem + response: OpenAIResponseObject + + +class OpenAIResponsesImpl: + def __init__( + self, + inference_api: Inference, + tool_groups_api: ToolGroups, + tool_runtime_api: ToolRuntime, + responses_store: ResponsesStore, + vector_io_api: VectorIO, # VectorIO + ): + self.inference_api = inference_api + self.tool_groups_api = tool_groups_api + self.tool_runtime_api = tool_runtime_api + self.responses_store = responses_store + self.vector_io_api = vector_io_api + self.tool_executor = ToolExecutor( + tool_groups_api=tool_groups_api, + tool_runtime_api=tool_runtime_api, + vector_io_api=vector_io_api, + ) + + async def _prepend_previous_response( + self, + input: str | list[OpenAIResponseInput], + previous_response_id: str | None = None, + ): + if previous_response_id: + previous_response_with_input = await self.responses_store.get_response_object(previous_response_id) + + # previous response input items + new_input_items = previous_response_with_input.input + + # previous response output items + new_input_items.extend(previous_response_with_input.output) + + # new input items from the current request + if isinstance(input, str): + new_input_items.append(OpenAIResponseMessage(content=input, role="user")) + else: + new_input_items.extend(input) + + input = new_input_items + + return input + + async def _prepend_instructions(self, messages, instructions): + if instructions: + messages.insert(0, OpenAISystemMessageParam(content=instructions)) + + async def get_openai_response( + self, + response_id: str, + ) -> OpenAIResponseObject: + response_with_input = await self.responses_store.get_response_object(response_id) + return OpenAIResponseObject(**{k: v for k, v in response_with_input.model_dump().items() if k != "input"}) + + async def list_openai_responses( + self, + after: str | None = None, + limit: int | None = 50, + model: str | None = None, + order: Order | None = Order.desc, + ) -> ListOpenAIResponseObject: + return await self.responses_store.list_responses(after, limit, model, order) + + async def list_openai_response_input_items( + self, + response_id: str, + after: str | None = None, + before: str | None = None, + include: list[str] | None = None, + limit: int | None = 20, + order: Order | None = Order.desc, + ) -> ListOpenAIResponseInputItem: + """List input items for a given OpenAI response. + + :param response_id: The ID of the response to retrieve input items for. + :param after: An item ID to list items after, used for pagination. + :param before: An item ID to list items before, used for pagination. + :param include: Additional fields to include in the response. + :param limit: A limit on the number of objects to be returned. + :param order: The order to return the input items in. + :returns: An ListOpenAIResponseInputItem. + """ + return await self.responses_store.list_response_input_items(response_id, after, before, include, limit, order) + + async def _store_response( + self, + response: OpenAIResponseObject, + input: str | list[OpenAIResponseInput], + ) -> None: + new_input_id = f"msg_{uuid.uuid4()}" + if isinstance(input, str): + # synthesize a message from the input string + input_content = OpenAIResponseInputMessageContentText(text=input) + input_content_item = OpenAIResponseMessage( + role="user", + content=[input_content], + id=new_input_id, + ) + input_items_data = [input_content_item] + else: + # we already have a list of messages + input_items_data = [] + for input_item in input: + if isinstance(input_item, OpenAIResponseMessage): + # These may or may not already have an id, so dump to dict, check for id, and add if missing + input_item_dict = input_item.model_dump() + if "id" not in input_item_dict: + input_item_dict["id"] = new_input_id + input_items_data.append(OpenAIResponseMessage(**input_item_dict)) + else: + input_items_data.append(input_item) + + await self.responses_store.store_response_object( + response_object=response, + input=input_items_data, + ) + + async def create_openai_response( + self, + input: str | list[OpenAIResponseInput], + model: str, + instructions: str | None = None, + previous_response_id: str | None = None, + store: bool | None = True, + stream: bool | None = False, + temperature: float | None = None, + text: OpenAIResponseText | None = None, + tools: list[OpenAIResponseInputTool] | None = None, + include: list[str] | None = None, + max_infer_iters: int | None = 10, + ): + stream = bool(stream) + text = OpenAIResponseText(format=OpenAIResponseTextFormat(type="text")) if text is None else text + + stream_gen = self._create_streaming_response( + input=input, + model=model, + instructions=instructions, + previous_response_id=previous_response_id, + store=store, + temperature=temperature, + text=text, + tools=tools, + max_infer_iters=max_infer_iters, + ) + + if stream: + return stream_gen + else: + response = None + async for stream_chunk in stream_gen: + if stream_chunk.type == "response.completed": + if response is not None: + raise ValueError("The response stream completed multiple times! Earlier response: {response}") + response = stream_chunk.response + # don't leave the generator half complete! + + if response is None: + raise ValueError("The response stream never completed") + return response + + async def _create_streaming_response( + self, + input: str | list[OpenAIResponseInput], + model: str, + instructions: str | None = None, + previous_response_id: str | None = None, + store: bool | None = True, + temperature: float | None = None, + text: OpenAIResponseText | None = None, + tools: list[OpenAIResponseInputTool] | None = None, + max_infer_iters: int | None = 10, + ) -> AsyncIterator[OpenAIResponseObjectStream]: + # Input preprocessing + input = await self._prepend_previous_response(input, previous_response_id) + messages = await convert_response_input_to_chat_messages(input) + await self._prepend_instructions(messages, instructions) + + # Structured outputs + response_format = await convert_response_text_to_chat_response_format(text) + + ctx = ChatCompletionContext( + model=model, + messages=messages, + response_tools=tools, + temperature=temperature, + response_format=response_format, + ) + + # Create orchestrator and delegate streaming logic + response_id = f"resp-{uuid.uuid4()}" + created_at = int(time.time()) + + orchestrator = StreamingResponseOrchestrator( + inference_api=self.inference_api, + ctx=ctx, + response_id=response_id, + created_at=created_at, + text=text, + max_infer_iters=max_infer_iters, + tool_executor=self.tool_executor, + ) + + # Stream the response + final_response = None + async for stream_chunk in orchestrator.create_response(): + if stream_chunk.type == "response.completed": + final_response = stream_chunk.response + yield stream_chunk + + # Store the response if requested + if store and final_response: + await self._store_response( + response=final_response, + input=input, + ) + + async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject: + return await self.responses_store.delete_response_object(response_id) diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py b/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py new file mode 100644 index 000000000..0879e978a --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/streaming.py @@ -0,0 +1,634 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import uuid +from collections.abc import AsyncIterator +from typing import Any + +from llama_stack.apis.agents.openai_responses import ( + AllowedToolsFilter, + MCPListToolsTool, + OpenAIResponseContentPartOutputText, + OpenAIResponseInputTool, + OpenAIResponseInputToolMCP, + OpenAIResponseObject, + OpenAIResponseObjectStream, + OpenAIResponseObjectStreamResponseCompleted, + OpenAIResponseObjectStreamResponseContentPartAdded, + OpenAIResponseObjectStreamResponseContentPartDone, + OpenAIResponseObjectStreamResponseCreated, + OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta, + OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta, + OpenAIResponseObjectStreamResponseMcpCallArgumentsDone, + OpenAIResponseObjectStreamResponseMcpListToolsCompleted, + OpenAIResponseObjectStreamResponseMcpListToolsInProgress, + OpenAIResponseObjectStreamResponseOutputItemAdded, + OpenAIResponseObjectStreamResponseOutputItemDone, + OpenAIResponseObjectStreamResponseOutputTextDelta, + OpenAIResponseOutput, + OpenAIResponseOutputMessageFunctionToolCall, + OpenAIResponseOutputMessageMCPListTools, + OpenAIResponseText, + WebSearchToolTypes, +) +from llama_stack.apis.inference import ( + Inference, + OpenAIAssistantMessageParam, + OpenAIChatCompletion, + OpenAIChatCompletionToolCall, + OpenAIChoice, +) +from llama_stack.log import get_logger + +from .types import ChatCompletionContext, ChatCompletionResult +from .utils import convert_chat_choice_to_response_message, is_function_tool_call + +logger = get_logger(name=__name__, category="responses") + + +class StreamingResponseOrchestrator: + def __init__( + self, + inference_api: Inference, + ctx: ChatCompletionContext, + response_id: str, + created_at: int, + text: OpenAIResponseText, + max_infer_iters: int, + tool_executor, # Will be the tool execution logic from the main class + ): + self.inference_api = inference_api + self.ctx = ctx + self.response_id = response_id + self.created_at = created_at + self.text = text + self.max_infer_iters = max_infer_iters + self.tool_executor = tool_executor + self.sequence_number = 0 + # Store MCP tool mapping that gets built during tool processing + self.mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] = {} + + async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]: + # Initialize output messages + output_messages: list[OpenAIResponseOutput] = [] + # Create initial response and emit response.created immediately + initial_response = OpenAIResponseObject( + created_at=self.created_at, + id=self.response_id, + model=self.ctx.model, + object="response", + status="in_progress", + output=output_messages.copy(), + text=self.text, + ) + + yield OpenAIResponseObjectStreamResponseCreated(response=initial_response) + + # Process all tools (including MCP tools) and emit streaming events + if self.ctx.response_tools: + async for stream_event in self._process_tools(self.ctx.response_tools, output_messages): + yield stream_event + + n_iter = 0 + messages = self.ctx.messages.copy() + + while True: + completion_result = await self.inference_api.openai_chat_completion( + model=self.ctx.model, + messages=messages, + tools=self.ctx.chat_tools, + stream=True, + temperature=self.ctx.temperature, + response_format=self.ctx.response_format, + ) + + # Process streaming chunks and build complete response + completion_result_data = None + async for stream_event_or_result in self._process_streaming_chunks(completion_result, output_messages): + if isinstance(stream_event_or_result, ChatCompletionResult): + completion_result_data = stream_event_or_result + else: + yield stream_event_or_result + if not completion_result_data: + raise ValueError("Streaming chunk processor failed to return completion data") + current_response = self._build_chat_completion(completion_result_data) + + function_tool_calls, non_function_tool_calls, next_turn_messages = self._separate_tool_calls( + current_response, messages + ) + + # Handle choices with no tool calls + for choice in current_response.choices: + if not (choice.message.tool_calls and self.ctx.response_tools): + output_messages.append(await convert_chat_choice_to_response_message(choice)) + + # Execute tool calls and coordinate results + async for stream_event in self._coordinate_tool_execution( + function_tool_calls, + non_function_tool_calls, + completion_result_data, + output_messages, + next_turn_messages, + ): + yield stream_event + + if not function_tool_calls and not non_function_tool_calls: + break + + if function_tool_calls: + logger.info("Exiting inference loop since there is a function (client-side) tool call") + break + + n_iter += 1 + if n_iter >= self.max_infer_iters: + logger.info(f"Exiting inference loop since iteration count({n_iter}) exceeds {self.max_infer_iters=}") + break + + messages = next_turn_messages + + # Create final response + final_response = OpenAIResponseObject( + created_at=self.created_at, + id=self.response_id, + model=self.ctx.model, + object="response", + status="completed", + text=self.text, + output=output_messages, + ) + + # Emit response.completed + yield OpenAIResponseObjectStreamResponseCompleted(response=final_response) + + def _separate_tool_calls(self, current_response, messages) -> tuple[list, list, list]: + """Separate tool calls into function and non-function categories.""" + function_tool_calls = [] + non_function_tool_calls = [] + next_turn_messages = messages.copy() + + for choice in current_response.choices: + next_turn_messages.append(choice.message) + + if choice.message.tool_calls and self.ctx.response_tools: + for tool_call in choice.message.tool_calls: + if is_function_tool_call(tool_call, self.ctx.response_tools): + function_tool_calls.append(tool_call) + else: + non_function_tool_calls.append(tool_call) + + return function_tool_calls, non_function_tool_calls, next_turn_messages + + async def _process_streaming_chunks( + self, completion_result, output_messages: list[OpenAIResponseOutput] + ) -> AsyncIterator[OpenAIResponseObjectStream | ChatCompletionResult]: + """Process streaming chunks and emit events, returning completion data.""" + # Initialize result tracking + chat_response_id = "" + chat_response_content = [] + chat_response_tool_calls: dict[int, OpenAIChatCompletionToolCall] = {} + chunk_created = 0 + chunk_model = "" + chunk_finish_reason = "" + + # Create a placeholder message item for delta events + message_item_id = f"msg_{uuid.uuid4()}" + # Track tool call items for streaming events + tool_call_item_ids: dict[int, str] = {} + # Track content parts for streaming events + content_part_emitted = False + + async for chunk in completion_result: + chat_response_id = chunk.id + chunk_created = chunk.created + chunk_model = chunk.model + for chunk_choice in chunk.choices: + # Emit incremental text content as delta events + if chunk_choice.delta.content: + # Emit content_part.added event for first text chunk + if not content_part_emitted: + content_part_emitted = True + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartAdded( + response_id=self.response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text="", # Will be filled incrementally via text deltas + ), + sequence_number=self.sequence_number, + ) + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseOutputTextDelta( + content_index=0, + delta=chunk_choice.delta.content, + item_id=message_item_id, + output_index=0, + sequence_number=self.sequence_number, + ) + + # Collect content for final response + chat_response_content.append(chunk_choice.delta.content or "") + if chunk_choice.finish_reason: + chunk_finish_reason = chunk_choice.finish_reason + + # Aggregate tool call arguments across chunks + if chunk_choice.delta.tool_calls: + for tool_call in chunk_choice.delta.tool_calls: + response_tool_call = chat_response_tool_calls.get(tool_call.index, None) + # Create new tool call entry if this is the first chunk for this index + is_new_tool_call = response_tool_call is None + if is_new_tool_call: + tool_call_dict: dict[str, Any] = tool_call.model_dump() + tool_call_dict.pop("type", None) + response_tool_call = OpenAIChatCompletionToolCall(**tool_call_dict) + chat_response_tool_calls[tool_call.index] = response_tool_call + + # Create item ID for this tool call for streaming events + tool_call_item_id = f"fc_{uuid.uuid4()}" + tool_call_item_ids[tool_call.index] = tool_call_item_id + + # Emit output_item.added event for the new function call + self.sequence_number += 1 + function_call_item = OpenAIResponseOutputMessageFunctionToolCall( + arguments="", # Will be filled incrementally via delta events + call_id=tool_call.id or "", + name=tool_call.function.name if tool_call.function else "", + id=tool_call_item_id, + status="in_progress", + ) + yield OpenAIResponseObjectStreamResponseOutputItemAdded( + response_id=self.response_id, + item=function_call_item, + output_index=len(output_messages), + sequence_number=self.sequence_number, + ) + + # Stream tool call arguments as they arrive (differentiate between MCP and function calls) + if tool_call.function and tool_call.function.arguments: + tool_call_item_id = tool_call_item_ids[tool_call.index] + self.sequence_number += 1 + + # Check if this is an MCP tool call + is_mcp_tool = tool_call.function.name and tool_call.function.name in self.mcp_tool_to_server + if is_mcp_tool: + # Emit MCP-specific argument delta event + yield OpenAIResponseObjectStreamResponseMcpCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=self.sequence_number, + ) + else: + # Emit function call argument delta event + yield OpenAIResponseObjectStreamResponseFunctionCallArgumentsDelta( + delta=tool_call.function.arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=self.sequence_number, + ) + + # Accumulate arguments for final response (only for subsequent chunks) + if not is_new_tool_call: + response_tool_call.function.arguments = ( + response_tool_call.function.arguments or "" + ) + tool_call.function.arguments + + # Emit arguments.done events for completed tool calls (differentiate between MCP and function calls) + for tool_call_index in sorted(chat_response_tool_calls.keys()): + tool_call_item_id = tool_call_item_ids[tool_call_index] + final_arguments = chat_response_tool_calls[tool_call_index].function.arguments or "" + tool_call_name = chat_response_tool_calls[tool_call_index].function.name + + # Check if this is an MCP tool call + is_mcp_tool = tool_call_name and tool_call_name in self.mcp_tool_to_server + self.sequence_number += 1 + done_event_cls = ( + OpenAIResponseObjectStreamResponseMcpCallArgumentsDone + if is_mcp_tool + else OpenAIResponseObjectStreamResponseFunctionCallArgumentsDone + ) + yield done_event_cls( + arguments=final_arguments, + item_id=tool_call_item_id, + output_index=len(output_messages), + sequence_number=self.sequence_number, + ) + + # Emit content_part.done event if text content was streamed (before content gets cleared) + if content_part_emitted: + final_text = "".join(chat_response_content) + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseContentPartDone( + response_id=self.response_id, + item_id=message_item_id, + part=OpenAIResponseContentPartOutputText( + text=final_text, + ), + sequence_number=self.sequence_number, + ) + + # Clear content when there are tool calls (OpenAI spec behavior) + if chat_response_tool_calls: + chat_response_content = [] + + yield ChatCompletionResult( + response_id=chat_response_id, + content=chat_response_content, + tool_calls=chat_response_tool_calls, + created=chunk_created, + model=chunk_model, + finish_reason=chunk_finish_reason, + message_item_id=message_item_id, + tool_call_item_ids=tool_call_item_ids, + content_part_emitted=content_part_emitted, + ) + + def _build_chat_completion(self, result: ChatCompletionResult) -> OpenAIChatCompletion: + """Build OpenAIChatCompletion from ChatCompletionResult.""" + # Convert collected chunks to complete response + if result.tool_calls: + tool_calls = [result.tool_calls[i] for i in sorted(result.tool_calls.keys())] + else: + tool_calls = None + + assistant_message = OpenAIAssistantMessageParam( + content=result.content_text, + tool_calls=tool_calls, + ) + return OpenAIChatCompletion( + id=result.response_id, + choices=[ + OpenAIChoice( + message=assistant_message, + finish_reason=result.finish_reason, + index=0, + ) + ], + created=result.created, + model=result.model, + ) + + async def _coordinate_tool_execution( + self, + function_tool_calls: list, + non_function_tool_calls: list, + completion_result_data: ChatCompletionResult, + output_messages: list[OpenAIResponseOutput], + next_turn_messages: list, + ) -> AsyncIterator[OpenAIResponseObjectStream]: + """Coordinate execution of both function and non-function tool calls.""" + # Execute non-function tool calls + for tool_call in non_function_tool_calls: + # Find the item_id for this tool call + matching_item_id = None + for index, item_id in completion_result_data.tool_call_item_ids.items(): + response_tool_call = completion_result_data.tool_calls.get(index) + if response_tool_call and response_tool_call.id == tool_call.id: + matching_item_id = item_id + break + + # Use a fallback item_id if not found + if not matching_item_id: + matching_item_id = f"tc_{uuid.uuid4()}" + + # Execute tool call with streaming + tool_call_log = None + tool_response_message = None + async for result in self.tool_executor.execute_tool_call( + tool_call, + self.ctx, + self.sequence_number, + len(output_messages), + matching_item_id, + self.mcp_tool_to_server, + ): + if result.stream_event: + # Forward streaming events + self.sequence_number = result.sequence_number + yield result.stream_event + + if result.final_output_message is not None: + tool_call_log = result.final_output_message + tool_response_message = result.final_input_message + self.sequence_number = result.sequence_number + + if tool_call_log: + output_messages.append(tool_call_log) + + # Emit output_item.done event for completed non-function tool call + if matching_item_id: + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseOutputItemDone( + response_id=self.response_id, + item=tool_call_log, + output_index=len(output_messages) - 1, + sequence_number=self.sequence_number, + ) + + if tool_response_message: + next_turn_messages.append(tool_response_message) + + # Execute function tool calls (client-side) + for tool_call in function_tool_calls: + # Find the item_id for this tool call from our tracking dictionary + matching_item_id = None + for index, item_id in completion_result_data.tool_call_item_ids.items(): + response_tool_call = completion_result_data.tool_calls.get(index) + if response_tool_call and response_tool_call.id == tool_call.id: + matching_item_id = item_id + break + + # Use existing item_id or create new one if not found + final_item_id = matching_item_id or f"fc_{uuid.uuid4()}" + + function_call_item = OpenAIResponseOutputMessageFunctionToolCall( + arguments=tool_call.function.arguments or "", + call_id=tool_call.id, + name=tool_call.function.name or "", + id=final_item_id, + status="completed", + ) + output_messages.append(function_call_item) + + # Emit output_item.done event for completed function call + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseOutputItemDone( + response_id=self.response_id, + item=function_call_item, + output_index=len(output_messages) - 1, + sequence_number=self.sequence_number, + ) + + async def _process_tools( + self, tools: list[OpenAIResponseInputTool], output_messages: list[OpenAIResponseOutput] + ) -> AsyncIterator[OpenAIResponseObjectStream]: + """Process all tools and emit appropriate streaming events.""" + from openai.types.chat import ChatCompletionToolParam + + from llama_stack.apis.tools import Tool + from llama_stack.models.llama.datatypes import ToolDefinition, ToolParamDefinition + from llama_stack.providers.utils.inference.openai_compat import convert_tooldef_to_openai_tool + + def make_openai_tool(tool_name: str, tool: Tool) -> ChatCompletionToolParam: + tool_def = ToolDefinition( + tool_name=tool_name, + description=tool.description, + parameters={ + param.name: ToolParamDefinition( + param_type=param.parameter_type, + description=param.description, + required=param.required, + default=param.default, + ) + for param in tool.parameters + }, + ) + return convert_tooldef_to_openai_tool(tool_def) + + # Initialize chat_tools if not already set + if self.ctx.chat_tools is None: + self.ctx.chat_tools = [] + + for input_tool in tools: + if input_tool.type == "function": + self.ctx.chat_tools.append(ChatCompletionToolParam(type="function", function=input_tool.model_dump())) + elif input_tool.type in WebSearchToolTypes: + tool_name = "web_search" + # Need to access tool_groups_api from tool_executor + tool = await self.tool_executor.tool_groups_api.get_tool(tool_name) + if not tool: + raise ValueError(f"Tool {tool_name} not found") + self.ctx.chat_tools.append(make_openai_tool(tool_name, tool)) + elif input_tool.type == "file_search": + tool_name = "knowledge_search" + tool = await self.tool_executor.tool_groups_api.get_tool(tool_name) + if not tool: + raise ValueError(f"Tool {tool_name} not found") + self.ctx.chat_tools.append(make_openai_tool(tool_name, tool)) + elif input_tool.type == "mcp": + async for stream_event in self._process_mcp_tool(input_tool, output_messages): + yield stream_event + else: + raise ValueError(f"Llama Stack OpenAI Responses does not yet support tool type: {input_tool.type}") + + async def _process_mcp_tool( + self, mcp_tool: OpenAIResponseInputToolMCP, output_messages: list[OpenAIResponseOutput] + ) -> AsyncIterator[OpenAIResponseObjectStream]: + """Process an MCP tool configuration and emit appropriate streaming events.""" + from llama_stack.providers.utils.tools.mcp import list_mcp_tools + + # Emit mcp_list_tools.in_progress + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseMcpListToolsInProgress( + sequence_number=self.sequence_number, + ) + + try: + # Parse allowed/never allowed tools + always_allowed = None + never_allowed = None + if mcp_tool.allowed_tools: + if isinstance(mcp_tool.allowed_tools, list): + always_allowed = mcp_tool.allowed_tools + elif isinstance(mcp_tool.allowed_tools, AllowedToolsFilter): + always_allowed = mcp_tool.allowed_tools.always + never_allowed = mcp_tool.allowed_tools.never + + # Call list_mcp_tools + tool_defs = await list_mcp_tools( + endpoint=mcp_tool.server_url, + headers=mcp_tool.headers or {}, + ) + + # Create the MCP list tools message + mcp_list_message = OpenAIResponseOutputMessageMCPListTools( + id=f"mcp_list_{uuid.uuid4()}", + server_label=mcp_tool.server_label, + tools=[], + ) + + # Process tools and update context + for t in tool_defs.data: + if never_allowed and t.name in never_allowed: + continue + if not always_allowed or t.name in always_allowed: + # Add to chat tools for inference + from llama_stack.models.llama.datatypes import ToolDefinition, ToolParamDefinition + from llama_stack.providers.utils.inference.openai_compat import convert_tooldef_to_openai_tool + + tool_def = ToolDefinition( + tool_name=t.name, + description=t.description, + parameters={ + param.name: ToolParamDefinition( + param_type=param.parameter_type, + description=param.description, + required=param.required, + default=param.default, + ) + for param in t.parameters + }, + ) + openai_tool = convert_tooldef_to_openai_tool(tool_def) + if self.ctx.chat_tools is None: + self.ctx.chat_tools = [] + self.ctx.chat_tools.append(openai_tool) + + # Add to MCP tool mapping + if t.name in self.mcp_tool_to_server: + raise ValueError(f"Duplicate tool name {t.name} found for server {mcp_tool.server_label}") + self.mcp_tool_to_server[t.name] = mcp_tool + + # Add to MCP list message + mcp_list_message.tools.append( + MCPListToolsTool( + name=t.name, + description=t.description, + input_schema={ + "type": "object", + "properties": { + p.name: { + "type": p.parameter_type, + "description": p.description, + } + for p in t.parameters + }, + "required": [p.name for p in t.parameters if p.required], + }, + ) + ) + + # Add the MCP list message to output + output_messages.append(mcp_list_message) + + # Emit output_item.added for the MCP list tools message + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseOutputItemAdded( + response_id=self.response_id, + item=mcp_list_message, + output_index=len(output_messages) - 1, + sequence_number=self.sequence_number, + ) + + # Emit mcp_list_tools.completed + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseMcpListToolsCompleted( + sequence_number=self.sequence_number, + ) + + # Emit output_item.done for the MCP list tools message + self.sequence_number += 1 + yield OpenAIResponseObjectStreamResponseOutputItemDone( + response_id=self.response_id, + item=mcp_list_message, + output_index=len(output_messages) - 1, + sequence_number=self.sequence_number, + ) + + except Exception as e: + # TODO: Emit mcp_list_tools.failed event if needed + logger.exception(f"Failed to list MCP tools from {mcp_tool.server_url}: {e}") + raise diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/tool_executor.py b/llama_stack/providers/inline/agents/meta_reference/responses/tool_executor.py new file mode 100644 index 000000000..5b98b4f51 --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/tool_executor.py @@ -0,0 +1,379 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import json +from collections.abc import AsyncIterator + +from llama_stack.apis.agents.openai_responses import ( + OpenAIResponseInputToolFileSearch, + OpenAIResponseInputToolMCP, + OpenAIResponseObjectStreamResponseMcpCallCompleted, + OpenAIResponseObjectStreamResponseMcpCallFailed, + OpenAIResponseObjectStreamResponseMcpCallInProgress, + OpenAIResponseObjectStreamResponseWebSearchCallCompleted, + OpenAIResponseObjectStreamResponseWebSearchCallInProgress, + OpenAIResponseObjectStreamResponseWebSearchCallSearching, + OpenAIResponseOutputMessageFileSearchToolCall, + OpenAIResponseOutputMessageFileSearchToolCallResults, + OpenAIResponseOutputMessageWebSearchToolCall, +) +from llama_stack.apis.common.content_types import ( + ImageContentItem, + TextContentItem, +) +from llama_stack.apis.inference import ( + OpenAIChatCompletionContentPartImageParam, + OpenAIChatCompletionContentPartTextParam, + OpenAIChatCompletionToolCall, + OpenAIImageURL, + OpenAIToolMessageParam, +) +from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime +from llama_stack.apis.vector_io import VectorIO +from llama_stack.log import get_logger + +from .types import ChatCompletionContext, ToolExecutionResult + +logger = get_logger(name=__name__, category="responses") + + +class ToolExecutor: + def __init__( + self, + tool_groups_api: ToolGroups, + tool_runtime_api: ToolRuntime, + vector_io_api: VectorIO, + ): + self.tool_groups_api = tool_groups_api + self.tool_runtime_api = tool_runtime_api + self.vector_io_api = vector_io_api + + async def execute_tool_call( + self, + tool_call: OpenAIChatCompletionToolCall, + ctx: ChatCompletionContext, + sequence_number: int, + output_index: int, + item_id: str, + mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] | None = None, + ) -> AsyncIterator[ToolExecutionResult]: + tool_call_id = tool_call.id + function = tool_call.function + tool_kwargs = json.loads(function.arguments) if function.arguments else {} + + if not function or not tool_call_id or not function.name: + yield ToolExecutionResult(sequence_number=sequence_number) + return + + # Emit progress events for tool execution start + async for event_result in self._emit_progress_events( + function.name, ctx, sequence_number, output_index, item_id, mcp_tool_to_server + ): + sequence_number = event_result.sequence_number + yield event_result + + # Execute the actual tool call + error_exc, result = await self._execute_tool(function.name, tool_kwargs, ctx, mcp_tool_to_server) + + # Emit completion events for tool execution + has_error = error_exc or (result and ((result.error_code and result.error_code > 0) or result.error_message)) + async for event_result in self._emit_completion_events( + function.name, ctx, sequence_number, output_index, item_id, has_error, mcp_tool_to_server + ): + sequence_number = event_result.sequence_number + yield event_result + + # Build result messages from tool execution + output_message, input_message = await self._build_result_messages( + function, tool_call_id, tool_kwargs, ctx, error_exc, result, has_error, mcp_tool_to_server + ) + + # Yield the final result + yield ToolExecutionResult( + sequence_number=sequence_number, final_output_message=output_message, final_input_message=input_message + ) + + async def _execute_knowledge_search_via_vector_store( + self, + query: str, + response_file_search_tool: OpenAIResponseInputToolFileSearch, + ) -> ToolInvocationResult: + """Execute knowledge search using vector_stores.search API with filters support.""" + search_results = [] + + # Create search tasks for all vector stores + async def search_single_store(vector_store_id): + try: + search_response = await self.vector_io_api.openai_search_vector_store( + vector_store_id=vector_store_id, + query=query, + filters=response_file_search_tool.filters, + max_num_results=response_file_search_tool.max_num_results, + ranking_options=response_file_search_tool.ranking_options, + rewrite_query=False, + ) + return search_response.data + except Exception as e: + logger.warning(f"Failed to search vector store {vector_store_id}: {e}") + return [] + + # Run all searches in parallel using gather + search_tasks = [search_single_store(vid) for vid in response_file_search_tool.vector_store_ids] + all_results = await asyncio.gather(*search_tasks) + + # Flatten results + for results in all_results: + search_results.extend(results) + + # Convert search results to tool result format matching memory.py + # Format the results as interleaved content similar to memory.py + content_items = [] + content_items.append( + TextContentItem( + text=f"knowledge_search tool found {len(search_results)} chunks:\nBEGIN of knowledge_search tool results.\n" + ) + ) + + for i, result_item in enumerate(search_results): + chunk_text = result_item.content[0].text if result_item.content else "" + metadata_text = f"document_id: {result_item.file_id}, score: {result_item.score}" + if result_item.attributes: + metadata_text += f", attributes: {result_item.attributes}" + text_content = f"[{i + 1}] {metadata_text}\n{chunk_text}\n" + content_items.append(TextContentItem(text=text_content)) + + content_items.append(TextContentItem(text="END of knowledge_search tool results.\n")) + content_items.append( + TextContentItem( + text=f'The above results were retrieved to help answer the user\'s query: "{query}". Use them as supporting information only in answering this query.\n', + ) + ) + + return ToolInvocationResult( + content=content_items, + metadata={ + "document_ids": [r.file_id for r in search_results], + "chunks": [r.content[0].text if r.content else "" for r in search_results], + "scores": [r.score for r in search_results], + }, + ) + + async def _emit_progress_events( + self, + function_name: str, + ctx: ChatCompletionContext, + sequence_number: int, + output_index: int, + item_id: str, + mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] | None = None, + ) -> AsyncIterator[ToolExecutionResult]: + """Emit progress events for tool execution start.""" + # Emit in_progress event based on tool type (only for tools with specific streaming events) + progress_event = None + if mcp_tool_to_server and function_name in mcp_tool_to_server: + sequence_number += 1 + progress_event = OpenAIResponseObjectStreamResponseMcpCallInProgress( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + elif function_name == "web_search": + sequence_number += 1 + progress_event = OpenAIResponseObjectStreamResponseWebSearchCallInProgress( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + # Note: knowledge_search and other custom tools don't have specific streaming events in OpenAI spec + + if progress_event: + yield ToolExecutionResult(stream_event=progress_event, sequence_number=sequence_number) + + # For web search, emit searching event + if function_name == "web_search": + sequence_number += 1 + searching_event = OpenAIResponseObjectStreamResponseWebSearchCallSearching( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + yield ToolExecutionResult(stream_event=searching_event, sequence_number=sequence_number) + + async def _execute_tool( + self, + function_name: str, + tool_kwargs: dict, + ctx: ChatCompletionContext, + mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] | None = None, + ) -> tuple[Exception | None, any]: + """Execute the tool and return error exception and result.""" + error_exc = None + result = None + + try: + if mcp_tool_to_server and function_name in mcp_tool_to_server: + from llama_stack.providers.utils.tools.mcp import invoke_mcp_tool + + mcp_tool = mcp_tool_to_server[function_name] + result = await invoke_mcp_tool( + endpoint=mcp_tool.server_url, + headers=mcp_tool.headers or {}, + tool_name=function_name, + kwargs=tool_kwargs, + ) + elif function_name == "knowledge_search": + response_file_search_tool = next( + (t for t in ctx.response_tools if isinstance(t, OpenAIResponseInputToolFileSearch)), + None, + ) + if response_file_search_tool: + # Use vector_stores.search API instead of knowledge_search tool + # to support filters and ranking_options + query = tool_kwargs.get("query", "") + result = await self._execute_knowledge_search_via_vector_store( + query=query, + response_file_search_tool=response_file_search_tool, + ) + else: + result = await self.tool_runtime_api.invoke_tool( + tool_name=function_name, + kwargs=tool_kwargs, + ) + except Exception as e: + error_exc = e + + return error_exc, result + + async def _emit_completion_events( + self, + function_name: str, + ctx: ChatCompletionContext, + sequence_number: int, + output_index: int, + item_id: str, + has_error: bool, + mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] | None = None, + ) -> AsyncIterator[ToolExecutionResult]: + """Emit completion or failure events for tool execution.""" + completion_event = None + + if mcp_tool_to_server and function_name in mcp_tool_to_server: + sequence_number += 1 + if has_error: + completion_event = OpenAIResponseObjectStreamResponseMcpCallFailed( + sequence_number=sequence_number, + ) + else: + completion_event = OpenAIResponseObjectStreamResponseMcpCallCompleted( + sequence_number=sequence_number, + ) + elif function_name == "web_search": + sequence_number += 1 + completion_event = OpenAIResponseObjectStreamResponseWebSearchCallCompleted( + item_id=item_id, + output_index=output_index, + sequence_number=sequence_number, + ) + # Note: knowledge_search and other custom tools don't have specific completion events in OpenAI spec + + if completion_event: + yield ToolExecutionResult(stream_event=completion_event, sequence_number=sequence_number) + + async def _build_result_messages( + self, + function, + tool_call_id: str, + tool_kwargs: dict, + ctx: ChatCompletionContext, + error_exc: Exception | None, + result: any, + has_error: bool, + mcp_tool_to_server: dict[str, OpenAIResponseInputToolMCP] | None = None, + ) -> tuple[any, any]: + """Build output and input messages from tool execution results.""" + from llama_stack.providers.utils.inference.prompt_adapter import ( + interleaved_content_as_str, + ) + + # Build output message + if mcp_tool_to_server and function.name in mcp_tool_to_server: + from llama_stack.apis.agents.openai_responses import ( + OpenAIResponseOutputMessageMCPCall, + ) + + message = OpenAIResponseOutputMessageMCPCall( + id=tool_call_id, + arguments=function.arguments, + name=function.name, + server_label=mcp_tool_to_server[function.name].server_label, + ) + if error_exc: + message.error = str(error_exc) + elif (result and result.error_code and result.error_code > 0) or (result and result.error_message): + message.error = f"Error (code {result.error_code}): {result.error_message}" + elif result and result.content: + message.output = interleaved_content_as_str(result.content) + else: + if function.name == "web_search": + message = OpenAIResponseOutputMessageWebSearchToolCall( + id=tool_call_id, + status="completed", + ) + if has_error: + message.status = "failed" + elif function.name == "knowledge_search": + message = OpenAIResponseOutputMessageFileSearchToolCall( + id=tool_call_id, + queries=[tool_kwargs.get("query", "")], + status="completed", + ) + if result and "document_ids" in result.metadata: + message.results = [] + for i, doc_id in enumerate(result.metadata["document_ids"]): + text = result.metadata["chunks"][i] if "chunks" in result.metadata else None + score = result.metadata["scores"][i] if "scores" in result.metadata else None + message.results.append( + OpenAIResponseOutputMessageFileSearchToolCallResults( + file_id=doc_id, + filename=doc_id, + text=text, + score=score, + attributes={}, + ) + ) + if has_error: + message.status = "failed" + else: + raise ValueError(f"Unknown tool {function.name} called") + + # Build input message + input_message = None + if result and result.content: + if isinstance(result.content, str): + content = result.content + elif isinstance(result.content, list): + content = [] + for item in result.content: + if isinstance(item, TextContentItem): + part = OpenAIChatCompletionContentPartTextParam(text=item.text) + elif isinstance(item, ImageContentItem): + if item.image.data: + url = f"data:image;base64,{item.image.data}" + else: + url = item.image.url + part = OpenAIChatCompletionContentPartImageParam(image_url=OpenAIImageURL(url=url)) + else: + raise ValueError(f"Unknown result content type: {type(item)}") + content.append(part) + else: + raise ValueError(f"Unknown result content type: {type(result.content)}") + input_message = OpenAIToolMessageParam(content=content, tool_call_id=tool_call_id) + else: + text = str(error_exc) if error_exc else "Tool execution failed" + input_message = OpenAIToolMessageParam(content=text, tool_call_id=tool_call_id) + + return message, input_message diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/types.py b/llama_stack/providers/inline/agents/meta_reference/responses/types.py new file mode 100644 index 000000000..89086c262 --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/types.py @@ -0,0 +1,60 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from dataclasses import dataclass + +from openai.types.chat import ChatCompletionToolParam +from pydantic import BaseModel + +from llama_stack.apis.agents.openai_responses import ( + OpenAIResponseInputTool, + OpenAIResponseObjectStream, + OpenAIResponseOutput, +) +from llama_stack.apis.inference import OpenAIChatCompletionToolCall, OpenAIMessageParam, OpenAIResponseFormatParam + + +class ToolExecutionResult(BaseModel): + """Result of streaming tool execution.""" + + stream_event: OpenAIResponseObjectStream | None = None + sequence_number: int + final_output_message: OpenAIResponseOutput | None = None + final_input_message: OpenAIMessageParam | None = None + + +@dataclass +class ChatCompletionResult: + """Result of processing streaming chat completion chunks.""" + + response_id: str + content: list[str] + tool_calls: dict[int, OpenAIChatCompletionToolCall] + created: int + model: str + finish_reason: str + message_item_id: str # For streaming events + tool_call_item_ids: dict[int, str] # For streaming events + content_part_emitted: bool # Tracking state + + @property + def content_text(self) -> str: + """Get joined content as string.""" + return "".join(self.content) + + @property + def has_tool_calls(self) -> bool: + """Check if there are any tool calls.""" + return bool(self.tool_calls) + + +class ChatCompletionContext(BaseModel): + model: str + messages: list[OpenAIMessageParam] + response_tools: list[OpenAIResponseInputTool] | None = None + chat_tools: list[ChatCompletionToolParam] | None = None + temperature: float | None + response_format: OpenAIResponseFormatParam diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/utils.py b/llama_stack/providers/inline/agents/meta_reference/responses/utils.py new file mode 100644 index 000000000..1507a55c8 --- /dev/null +++ b/llama_stack/providers/inline/agents/meta_reference/responses/utils.py @@ -0,0 +1,169 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import uuid + +from llama_stack.apis.agents.openai_responses import ( + OpenAIResponseInput, + OpenAIResponseInputFunctionToolCallOutput, + OpenAIResponseInputMessageContent, + OpenAIResponseInputMessageContentImage, + OpenAIResponseInputMessageContentText, + OpenAIResponseInputTool, + OpenAIResponseMessage, + OpenAIResponseOutputMessageContent, + OpenAIResponseOutputMessageContentOutputText, + OpenAIResponseOutputMessageFunctionToolCall, + OpenAIResponseText, +) +from llama_stack.apis.inference import ( + OpenAIAssistantMessageParam, + OpenAIChatCompletionContentPartImageParam, + OpenAIChatCompletionContentPartParam, + OpenAIChatCompletionContentPartTextParam, + OpenAIChatCompletionToolCall, + OpenAIChatCompletionToolCallFunction, + OpenAIChoice, + OpenAIDeveloperMessageParam, + OpenAIImageURL, + OpenAIJSONSchema, + OpenAIMessageParam, + OpenAIResponseFormatJSONObject, + OpenAIResponseFormatJSONSchema, + OpenAIResponseFormatParam, + OpenAIResponseFormatText, + OpenAISystemMessageParam, + OpenAIToolMessageParam, + OpenAIUserMessageParam, +) + + +async def convert_chat_choice_to_response_message(choice: OpenAIChoice) -> OpenAIResponseMessage: + """Convert an OpenAI Chat Completion choice into an OpenAI Response output message.""" + output_content = "" + if isinstance(choice.message.content, str): + output_content = choice.message.content + elif isinstance(choice.message.content, OpenAIChatCompletionContentPartTextParam): + output_content = choice.message.content.text + else: + raise ValueError( + f"Llama Stack OpenAI Responses does not yet support output content type: {type(choice.message.content)}" + ) + + return OpenAIResponseMessage( + id=f"msg_{uuid.uuid4()}", + content=[OpenAIResponseOutputMessageContentOutputText(text=output_content)], + status="completed", + role="assistant", + ) + + +async def convert_response_content_to_chat_content( + content: (str | list[OpenAIResponseInputMessageContent] | list[OpenAIResponseOutputMessageContent]), +) -> str | list[OpenAIChatCompletionContentPartParam]: + """ + Convert the content parts from an OpenAI Response API request into OpenAI Chat Completion content parts. + + The content schemas of each API look similar, but are not exactly the same. + """ + if isinstance(content, str): + return content + + converted_parts = [] + for content_part in content: + if isinstance(content_part, OpenAIResponseInputMessageContentText): + converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part.text)) + elif isinstance(content_part, OpenAIResponseOutputMessageContentOutputText): + converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part.text)) + elif isinstance(content_part, OpenAIResponseInputMessageContentImage): + if content_part.image_url: + image_url = OpenAIImageURL(url=content_part.image_url, detail=content_part.detail) + converted_parts.append(OpenAIChatCompletionContentPartImageParam(image_url=image_url)) + elif isinstance(content_part, str): + converted_parts.append(OpenAIChatCompletionContentPartTextParam(text=content_part)) + else: + raise ValueError( + f"Llama Stack OpenAI Responses does not yet support content type '{type(content_part)}' in this context" + ) + return converted_parts + + +async def convert_response_input_to_chat_messages( + input: str | list[OpenAIResponseInput], +) -> list[OpenAIMessageParam]: + """ + Convert the input from an OpenAI Response API request into OpenAI Chat Completion messages. + """ + messages: list[OpenAIMessageParam] = [] + if isinstance(input, list): + for input_item in input: + if isinstance(input_item, OpenAIResponseInputFunctionToolCallOutput): + messages.append( + OpenAIToolMessageParam( + content=input_item.output, + tool_call_id=input_item.call_id, + ) + ) + elif isinstance(input_item, OpenAIResponseOutputMessageFunctionToolCall): + tool_call = OpenAIChatCompletionToolCall( + index=0, + id=input_item.call_id, + function=OpenAIChatCompletionToolCallFunction( + name=input_item.name, + arguments=input_item.arguments, + ), + ) + messages.append(OpenAIAssistantMessageParam(tool_calls=[tool_call])) + else: + content = await convert_response_content_to_chat_content(input_item.content) + message_type = await get_message_type_by_role(input_item.role) + if message_type is None: + raise ValueError( + f"Llama Stack OpenAI Responses does not yet support message role '{input_item.role}' in this context" + ) + messages.append(message_type(content=content)) + else: + messages.append(OpenAIUserMessageParam(content=input)) + return messages + + +async def convert_response_text_to_chat_response_format( + text: OpenAIResponseText, +) -> OpenAIResponseFormatParam: + """ + Convert an OpenAI Response text parameter into an OpenAI Chat Completion response format. + """ + if not text.format or text.format["type"] == "text": + return OpenAIResponseFormatText(type="text") + if text.format["type"] == "json_object": + return OpenAIResponseFormatJSONObject() + if text.format["type"] == "json_schema": + return OpenAIResponseFormatJSONSchema( + json_schema=OpenAIJSONSchema(name=text.format["name"], schema=text.format["schema"]) + ) + raise ValueError(f"Unsupported text format: {text.format}") + + +async def get_message_type_by_role(role: str): + role_to_type = { + "user": OpenAIUserMessageParam, + "system": OpenAISystemMessageParam, + "assistant": OpenAIAssistantMessageParam, + "developer": OpenAIDeveloperMessageParam, + } + return role_to_type.get(role) + + +def is_function_tool_call( + tool_call: OpenAIChatCompletionToolCall, + tools: list[OpenAIResponseInputTool], +) -> bool: + if not tool_call.function: + return False + for t in tools: + if t.type == "function" and t.name == tool_call.function.name: + return True + return False diff --git a/llama_stack/providers/utils/inference/openai_compat.py b/llama_stack/providers/utils/inference/openai_compat.py index 9a77c8cc4..6297cc2ed 100644 --- a/llama_stack/providers/utils/inference/openai_compat.py +++ b/llama_stack/providers/utils/inference/openai_compat.py @@ -31,15 +31,15 @@ from openai.types.chat import ( from openai.types.chat import ( ChatCompletionContentPartTextParam as OpenAIChatCompletionContentPartTextParam, ) +from openai.types.chat import ( + ChatCompletionMessageFunctionToolCall as OpenAIChatCompletionMessageFunctionToolCall, +) from openai.types.chat import ( ChatCompletionMessageParam as OpenAIChatCompletionMessage, ) from openai.types.chat import ( ChatCompletionMessageToolCall, ) -from openai.types.chat import ( - ChatCompletionMessageToolCallParam as OpenAIChatCompletionMessageToolCall, -) from openai.types.chat import ( ChatCompletionSystemMessageParam as OpenAIChatCompletionSystemMessage, ) @@ -633,7 +633,7 @@ async def convert_message_to_openai_dict_new( ) elif isinstance(message, CompletionMessage): tool_calls = [ - OpenAIChatCompletionMessageToolCall( + OpenAIChatCompletionMessageFunctionToolCall( id=tool.call_id, function=OpenAIFunction( name=(tool.tool_name if not isinstance(tool.tool_name, BuiltinTool) else tool.tool_name.value), @@ -903,7 +903,7 @@ def _convert_openai_request_response_format( def _convert_openai_tool_calls( - tool_calls: list[OpenAIChatCompletionMessageToolCall], + tool_calls: list[OpenAIChatCompletionMessageFunctionToolCall], ) -> list[ToolCall]: """ Convert an OpenAI ChatCompletionMessageToolCall list into a list of ToolCall. diff --git a/llama_stack/ui/.nvmrc b/llama_stack/ui/.nvmrc new file mode 100644 index 000000000..1384ff6a1 --- /dev/null +++ b/llama_stack/ui/.nvmrc @@ -0,0 +1 @@ +22.5.1 diff --git a/llama_stack/ui/.prettierignore b/llama_stack/ui/.prettierignore index 1b8ac8894..b737ae6ed 100644 --- a/llama_stack/ui/.prettierignore +++ b/llama_stack/ui/.prettierignore @@ -1,3 +1,12 @@ # Ignore artifacts: build coverage +.next +node_modules +dist +*.lock +*.log + +# Generated files +*.min.js +*.min.css diff --git a/llama_stack/ui/.prettierrc b/llama_stack/ui/.prettierrc index 0967ef424..059475a24 100644 --- a/llama_stack/ui/.prettierrc +++ b/llama_stack/ui/.prettierrc @@ -1 +1,10 @@ -{} +{ + "semi": true, + "trailingComma": "es5", + "singleQuote": false, + "printWidth": 80, + "tabWidth": 2, + "useTabs": false, + "bracketSpacing": true, + "arrowParens": "avoid" +} diff --git a/llama_stack/ui/app/api/v1/[...path]/route.ts b/llama_stack/ui/app/api/v1/[...path]/route.ts index 1959f9099..51c1f8004 100644 --- a/llama_stack/ui/app/api/v1/[...path]/route.ts +++ b/llama_stack/ui/app/api/v1/[...path]/route.ts @@ -47,7 +47,7 @@ async function proxyRequest(request: NextRequest, method: string) { const responseText = await response.text(); console.log( - `Response from FastAPI: ${response.status} ${response.statusText}`, + `Response from FastAPI: ${response.status} ${response.statusText}` ); // Create response with same status and headers @@ -74,7 +74,7 @@ async function proxyRequest(request: NextRequest, method: string) { backend_url: BACKEND_URL, timestamp: new Date().toISOString(), }, - { status: 500 }, + { status: 500 } ); } } diff --git a/llama_stack/ui/app/auth/signin/page.tsx b/llama_stack/ui/app/auth/signin/page.tsx index c9510fd6b..0ccb4a397 100644 --- a/llama_stack/ui/app/auth/signin/page.tsx +++ b/llama_stack/ui/app/auth/signin/page.tsx @@ -51,9 +51,9 @@ export default function SignInPage() { onClick={() => { console.log("Signing in with GitHub..."); signIn("github", { callbackUrl: "/auth/signin" }).catch( - (error) => { + error => { console.error("Sign in error:", error); - }, + } ); }} className="w-full" diff --git a/llama_stack/ui/app/chat-playground/page.tsx b/llama_stack/ui/app/chat-playground/page.tsx index d8094af85..b8651aca0 100644 --- a/llama_stack/ui/app/chat-playground/page.tsx +++ b/llama_stack/ui/app/chat-playground/page.tsx @@ -29,14 +29,13 @@ export default function ChatPlaygroundPage() { const isModelsLoading = modelsLoading ?? true; - useEffect(() => { const fetchModels = async () => { try { setModelsLoading(true); setModelsError(null); const modelList = await client.models.list(); - const llmModels = modelList.filter(model => model.model_type === 'llm'); + const llmModels = modelList.filter(model => model.model_type === "llm"); setModels(llmModels); if (llmModels.length > 0) { setSelectedModel(llmModels[0].identifier); @@ -53,103 +52,122 @@ export default function ChatPlaygroundPage() { }, [client]); const extractTextContent = (content: unknown): string => { - if (typeof content === 'string') { + if (typeof content === "string") { return content; } if (Array.isArray(content)) { return content - .filter(item => item && typeof item === 'object' && 'type' in item && item.type === 'text') - .map(item => (item && typeof item === 'object' && 'text' in item) ? String(item.text) : '') - .join(''); + .filter( + item => + item && + typeof item === "object" && + "type" in item && + item.type === "text" + ) + .map(item => + item && typeof item === "object" && "text" in item + ? String(item.text) + : "" + ) + .join(""); } - if (content && typeof content === 'object' && 'type' in content && content.type === 'text' && 'text' in content) { - return String(content.text) || ''; + if ( + content && + typeof content === "object" && + "type" in content && + content.type === "text" && + "text" in content + ) { + return String(content.text) || ""; } - return ''; + return ""; }; const handleInputChange = (e: React.ChangeEvent) => { setInput(e.target.value); }; -const handleSubmit = async (event?: { preventDefault?: () => void }) => { - event?.preventDefault?.(); - if (!input.trim()) return; + const handleSubmit = async (event?: { preventDefault?: () => void }) => { + event?.preventDefault?.(); + if (!input.trim()) return; - // Add user message to chat - const userMessage: Message = { - id: Date.now().toString(), - role: "user", - content: input.trim(), - createdAt: new Date(), - }; - - setMessages(prev => [...prev, userMessage]); - setInput(""); - - // Use the helper function with the content - await handleSubmitWithContent(userMessage.content); -}; - -const handleSubmitWithContent = async (content: string) => { - setIsGenerating(true); - setError(null); - - try { - const messageParams: CompletionCreateParams["messages"] = [ - ...messages.map(msg => { - const msgContent = typeof msg.content === 'string' ? msg.content : extractTextContent(msg.content); - if (msg.role === "user") { - return { role: "user" as const, content: msgContent }; - } else if (msg.role === "assistant") { - return { role: "assistant" as const, content: msgContent }; - } else { - return { role: "system" as const, content: msgContent }; - } - }), - { role: "user" as const, content } - ]; - - const response = await client.chat.completions.create({ - model: selectedModel, - messages: messageParams, - stream: true, - }); - - const assistantMessage: Message = { - id: (Date.now() + 1).toString(), - role: "assistant", - content: "", + // Add user message to chat + const userMessage: Message = { + id: Date.now().toString(), + role: "user", + content: input.trim(), createdAt: new Date(), }; - setMessages(prev => [...prev, assistantMessage]); - let fullContent = ""; - for await (const chunk of response) { - if (chunk.choices && chunk.choices[0]?.delta?.content) { - const deltaContent = chunk.choices[0].delta.content; - fullContent += deltaContent; + setMessages(prev => [...prev, userMessage]); + setInput(""); - flushSync(() => { - setMessages(prev => { - const newMessages = [...prev]; - const lastMessage = newMessages[newMessages.length - 1]; - if (lastMessage.role === "assistant") { - lastMessage.content = fullContent; - } - return newMessages; + // Use the helper function with the content + await handleSubmitWithContent(userMessage.content); + }; + + const handleSubmitWithContent = async (content: string) => { + setIsGenerating(true); + setError(null); + + try { + const messageParams: CompletionCreateParams["messages"] = [ + ...messages.map(msg => { + const msgContent = + typeof msg.content === "string" + ? msg.content + : extractTextContent(msg.content); + if (msg.role === "user") { + return { role: "user" as const, content: msgContent }; + } else if (msg.role === "assistant") { + return { role: "assistant" as const, content: msgContent }; + } else { + return { role: "system" as const, content: msgContent }; + } + }), + { role: "user" as const, content }, + ]; + + const response = await client.chat.completions.create({ + model: selectedModel, + messages: messageParams, + stream: true, + }); + + const assistantMessage: Message = { + id: (Date.now() + 1).toString(), + role: "assistant", + content: "", + createdAt: new Date(), + }; + + setMessages(prev => [...prev, assistantMessage]); + let fullContent = ""; + for await (const chunk of response) { + if (chunk.choices && chunk.choices[0]?.delta?.content) { + const deltaContent = chunk.choices[0].delta.content; + fullContent += deltaContent; + + flushSync(() => { + setMessages(prev => { + const newMessages = [...prev]; + const lastMessage = newMessages[newMessages.length - 1]; + if (lastMessage.role === "assistant") { + lastMessage.content = fullContent; + } + return newMessages; + }); }); - }); + } } + } catch (err) { + console.error("Error sending message:", err); + setError("Failed to send message. Please try again."); + setMessages(prev => prev.slice(0, -1)); + } finally { + setIsGenerating(false); } - } catch (err) { - console.error("Error sending message:", err); - setError("Failed to send message. Please try again."); - setMessages(prev => prev.slice(0, -1)); - } finally { - setIsGenerating(false); - } -}; + }; const suggestions = [ "Write a Python function that prints 'Hello, World!'", "Explain step-by-step how to solve this math problem: If x² + 6x + 9 = 25, what is x?", @@ -163,7 +181,7 @@ const handleSubmitWithContent = async (content: string) => { content: message.content, createdAt: new Date(), }; - setMessages(prev => [...prev, newMessage]) + setMessages(prev => [...prev, newMessage]); handleSubmitWithContent(newMessage.content); }; @@ -177,12 +195,20 @@ const handleSubmitWithContent = async (content: string) => {

Chat Playground (Completions)

- - + - {models.map((model) => ( + {models.map(model => ( {model.identifier} diff --git a/llama_stack/ui/app/logs/chat-completions/[id]/page.tsx b/llama_stack/ui/app/logs/chat-completions/[id]/page.tsx index 82aa3496e..e11924f4c 100644 --- a/llama_stack/ui/app/logs/chat-completions/[id]/page.tsx +++ b/llama_stack/ui/app/logs/chat-completions/[id]/page.tsx @@ -33,12 +33,12 @@ export default function ChatCompletionDetailPage() { } catch (err) { console.error( `Error fetching chat completion detail for ID ${id}:`, - err, + err ); setError( err instanceof Error ? err - : new Error("Failed to fetch completion detail"), + : new Error("Failed to fetch completion detail") ); } finally { setIsLoading(false); diff --git a/llama_stack/ui/app/logs/responses/[id]/page.tsx b/llama_stack/ui/app/logs/responses/[id]/page.tsx index 7f4252856..922d35531 100644 --- a/llama_stack/ui/app/logs/responses/[id]/page.tsx +++ b/llama_stack/ui/app/logs/responses/[id]/page.tsx @@ -13,10 +13,10 @@ export default function ResponseDetailPage() { const client = useAuthClient(); const [responseDetail, setResponseDetail] = useState( - null, + null ); const [inputItems, setInputItems] = useState( - null, + null ); const [isLoading, setIsLoading] = useState(true); const [isLoadingInputItems, setIsLoadingInputItems] = useState(true); @@ -25,7 +25,7 @@ export default function ResponseDetailPage() { // Helper function to convert ResponseObject to OpenAIResponse const convertResponseObject = ( - responseData: ResponseObject, + responseData: ResponseObject ): OpenAIResponse => { return { id: responseData.id, @@ -73,12 +73,12 @@ export default function ResponseDetailPage() { } else { console.error( `Error fetching response detail for ID ${id}:`, - responseResult.reason, + responseResult.reason ); setError( responseResult.reason instanceof Error ? responseResult.reason - : new Error("Failed to fetch response detail"), + : new Error("Failed to fetch response detail") ); } @@ -90,18 +90,18 @@ export default function ResponseDetailPage() { } else { console.error( `Error fetching input items for response ID ${id}:`, - inputItemsResult.reason, + inputItemsResult.reason ); setInputItemsError( inputItemsResult.reason instanceof Error ? inputItemsResult.reason - : new Error("Failed to fetch input items"), + : new Error("Failed to fetch input items") ); } } catch (err) { console.error(`Unexpected error fetching data for ID ${id}:`, err); setError( - err instanceof Error ? err : new Error("Unexpected error occurred"), + err instanceof Error ? err : new Error("Unexpected error occurred") ); } finally { setIsLoading(false); diff --git a/llama_stack/ui/app/logs/vector-stores/[id]/files/[fileId]/contents/[contentId]/page.tsx b/llama_stack/ui/app/logs/vector-stores/[id]/files/[fileId]/contents/[contentId]/page.tsx index 6896b992a..d58de3085 100644 --- a/llama_stack/ui/app/logs/vector-stores/[id]/files/[fileId]/contents/[contentId]/page.tsx +++ b/llama_stack/ui/app/logs/vector-stores/[id]/files/[fileId]/contents/[contentId]/page.tsx @@ -18,7 +18,10 @@ import { PropertiesCard, PropertyItem, } from "@/components/layout/detail-layout"; -import { PageBreadcrumb, BreadcrumbSegment } from "@/components/layout/page-breadcrumb"; +import { + PageBreadcrumb, + BreadcrumbSegment, +} from "@/components/layout/page-breadcrumb"; export default function ContentDetailPage() { const params = useParams(); @@ -28,13 +31,13 @@ export default function ContentDetailPage() { const contentId = params.contentId as string; const client = useAuthClient(); - const getTextFromContent = (content: any): string => { - if (typeof content === 'string') { + const getTextFromContent = (content: unknown): string => { + if (typeof content === "string") { return content; - } else if (content && content.type === 'text') { + } else if (content && content.type === "text") { return content.text; } - return ''; + return ""; }; const [store, setStore] = useState(null); @@ -44,7 +47,9 @@ export default function ContentDetailPage() { const [error, setError] = useState(null); const [isEditing, setIsEditing] = useState(false); const [editedContent, setEditedContent] = useState(""); - const [editedMetadata, setEditedMetadata] = useState>({}); + const [editedMetadata, setEditedMetadata] = useState>( + {} + ); const [isEditingEmbedding, setIsEditingEmbedding] = useState(false); const [editedEmbedding, setEditedEmbedding] = useState([]); @@ -64,8 +69,13 @@ export default function ContentDetailPage() { setFile(fileResponse as VectorStoreFile); const contentsAPI = new ContentsAPI(client); - const contentsResponse = await contentsAPI.listContents(vectorStoreId, fileId); - const targetContent = contentsResponse.data.find(c => c.id === contentId); + const contentsResponse = await contentsAPI.listContents( + vectorStoreId, + fileId + ); + const targetContent = contentsResponse.data.find( + c => c.id === contentId + ); if (targetContent) { setContent(targetContent); @@ -76,7 +86,9 @@ export default function ContentDetailPage() { throw new Error(`Content ${contentId} not found`); } } catch (err) { - setError(err instanceof Error ? err : new Error("Failed to load content.")); + setError( + err instanceof Error ? err : new Error("Failed to load content.") + ); } finally { setIsLoading(false); } @@ -88,7 +100,8 @@ export default function ContentDetailPage() { if (!content) return; try { - const updates: { content?: string; metadata?: Record } = {}; + const updates: { content?: string; metadata?: Record } = + {}; if (editedContent !== getTextFromContent(content.content)) { updates.content = editedContent; @@ -100,25 +113,32 @@ export default function ContentDetailPage() { if (Object.keys(updates).length > 0) { const contentsAPI = new ContentsAPI(client); - const updatedContent = await contentsAPI.updateContent(vectorStoreId, fileId, contentId, updates); + const updatedContent = await contentsAPI.updateContent( + vectorStoreId, + fileId, + contentId, + updates + ); setContent(updatedContent); } setIsEditing(false); } catch (err) { - console.error('Failed to update content:', err); + console.error("Failed to update content:", err); } }; const handleDelete = async () => { - if (!confirm('Are you sure you want to delete this content?')) return; + if (!confirm("Are you sure you want to delete this content?")) return; try { const contentsAPI = new ContentsAPI(client); await contentsAPI.deleteContent(vectorStoreId, fileId, contentId); - router.push(`/logs/vector-stores/${vectorStoreId}/files/${fileId}/contents`); + router.push( + `/logs/vector-stores/${vectorStoreId}/files/${fileId}/contents` + ); } catch (err) { - console.error('Failed to delete content:', err); + console.error("Failed to delete content:", err); } }; @@ -134,10 +154,19 @@ export default function ContentDetailPage() { const breadcrumbSegments: BreadcrumbSegment[] = [ { label: "Vector Stores", href: "/logs/vector-stores" }, - { label: store?.name || vectorStoreId, href: `/logs/vector-stores/${vectorStoreId}` }, + { + label: store?.name || vectorStoreId, + href: `/logs/vector-stores/${vectorStoreId}`, + }, { label: "Files", href: `/logs/vector-stores/${vectorStoreId}` }, - { label: fileId, href: `/logs/vector-stores/${vectorStoreId}/files/${fileId}` }, - { label: "Contents", href: `/logs/vector-stores/${vectorStoreId}/files/${fileId}/contents` }, + { + label: fileId, + href: `/logs/vector-stores/${vectorStoreId}/files/${fileId}`, + }, + { + label: "Contents", + href: `/logs/vector-stores/${vectorStoreId}/files/${fileId}/contents`, + }, { label: contentId }, ]; @@ -186,7 +215,7 @@ export default function ContentDetailPage() { {isEditing ? (