diff --git a/llama_stack/models/llama/prompt_format.py b/llama_stack/models/llama/prompt_format.py index 6191df61a..16e4068d7 100644 --- a/llama_stack/models/llama/prompt_format.py +++ b/llama_stack/models/llama/prompt_format.py @@ -11,19 +11,13 @@ # top-level folder for each specific model found within the models/ directory at # the top-level of this source tree. -import json import textwrap -from pathlib import Path from pydantic import BaseModel, Field from llama_stack.models.llama.datatypes import ( RawContent, - RawMediaItem, RawMessage, - RawTextItem, - StopReason, - ToolCall, ToolPromptFormat, ) from llama_stack.models.llama.llama4.tokenizer import Tokenizer @@ -175,25 +169,6 @@ def llama3_1_builtin_code_interpreter_dialog(tool_prompt_format=ToolPromptFormat return messages -def llama3_1_builtin_tool_call_with_image_dialog( - tool_prompt_format=ToolPromptFormat.json, -): - this_dir = Path(__file__).parent - with open(this_dir / "llama3/dog.jpg", "rb") as f: - img = f.read() - - interface = LLama31Interface(tool_prompt_format) - - messages = interface.system_messages(**system_message_builtin_tools_only()) - messages += interface.user_message(content=[RawMediaItem(data=img), RawTextItem(text="What is this dog breed?")]) - messages += interface.assistant_response_messages( - "Based on the description of the dog in the image, it appears to be a small breed dog, possibly a terrier mix", - StopReason.end_of_turn, - ) - messages += interface.user_message("Search the web for some food recommendations for the indentified breed") - return messages - - def llama3_1_custom_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json): interface = LLama31Interface(tool_prompt_format) @@ -202,35 +177,6 @@ def llama3_1_custom_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json): return messages -def llama3_1_e2e_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json): - tool_response = json.dumps(["great song1", "awesome song2", "cool song3"]) - interface = LLama31Interface(tool_prompt_format) - - messages = interface.system_messages(**system_message_custom_tools_only()) - messages += interface.user_message(content="Use tools to get latest trending songs") - messages.append( - RawMessage( - role="assistant", - content="", - stop_reason=StopReason.end_of_message, - tool_calls=[ - ToolCall( - call_id="call_id", - tool_name="trending_songs", - arguments={"n": "10", "genre": "latest"}, - ) - ], - ), - ) - messages.append( - RawMessage( - role="assistant", - content=tool_response, - ) - ) - return messages - - def llama3_2_user_assistant_conversation(): return UseCase( title="User and assistant conversation", diff --git a/llama_stack/providers/inline/agents/meta_reference/agent_instance.py b/llama_stack/providers/inline/agents/meta_reference/agent_instance.py index c2ce9aa7b..b17c720e9 100644 --- a/llama_stack/providers/inline/agents/meta_reference/agent_instance.py +++ b/llama_stack/providers/inline/agents/meta_reference/agent_instance.py @@ -7,8 +7,6 @@ import copy import json import re -import secrets -import string import uuid import warnings from collections.abc import AsyncGenerator @@ -84,11 +82,6 @@ from llama_stack.providers.utils.telemetry import tracing from .persistence import AgentPersistence from .safety import SafetyException, ShieldRunnerMixin - -def make_random_string(length: int = 8): - return "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length)) - - TOOLS_ATTACHMENT_KEY_REGEX = re.compile(r"__tools_attachment__=(\{.*?\})") MEMORY_QUERY_TOOL = "knowledge_search" WEB_SEARCH_TOOL = "web_search" diff --git a/llama_stack/providers/inline/tool_runtime/rag/memory.py b/llama_stack/providers/inline/tool_runtime/rag/memory.py index aac86a056..3ccfd0bcb 100644 --- a/llama_stack/providers/inline/tool_runtime/rag/memory.py +++ b/llama_stack/providers/inline/tool_runtime/rag/memory.py @@ -8,8 +8,6 @@ import asyncio import base64 import io import mimetypes -import secrets -import string from typing import Any import httpx @@ -52,10 +50,6 @@ from .context_retriever import generate_rag_query log = get_logger(name=__name__, category="tool_runtime") -def make_random_string(length: int = 8): - return "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length)) - - async def raw_data_from_doc(doc: RAGDocument) -> tuple[bytes, str]: """Get raw binary data and mime type from a RAGDocument for file upload.""" if isinstance(doc.content, URL): diff --git a/llama_stack/providers/remote/inference/nvidia/openai_utils.py b/llama_stack/providers/remote/inference/nvidia/openai_utils.py deleted file mode 100644 index 0b0d7fcf3..000000000 --- a/llama_stack/providers/remote/inference/nvidia/openai_utils.py +++ /dev/null @@ -1,217 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import warnings -from collections.abc import AsyncGenerator -from typing import Any - -from openai import AsyncStream -from openai.types.chat.chat_completion import ( - Choice as OpenAIChoice, -) -from openai.types.completion import Completion as OpenAICompletion -from openai.types.completion_choice import Logprobs as OpenAICompletionLogprobs - -from llama_stack.apis.inference import ( - ChatCompletionRequest, - CompletionRequest, - CompletionResponse, - CompletionResponseStreamChunk, - GreedySamplingStrategy, - JsonSchemaResponseFormat, - TokenLogProbs, - TopKSamplingStrategy, - TopPSamplingStrategy, -) -from llama_stack.providers.utils.inference.openai_compat import ( - _convert_openai_finish_reason, - convert_message_to_openai_dict_new, - convert_tooldef_to_openai_tool, -) - - -async def convert_chat_completion_request( - request: ChatCompletionRequest, - n: int = 1, -) -> dict: - """ - Convert a ChatCompletionRequest to an OpenAI API-compatible dictionary. - """ - # model -> model - # messages -> messages - # sampling_params TODO(mattf): review strategy - # strategy=greedy -> nvext.top_k = -1, temperature = temperature - # strategy=top_p -> nvext.top_k = -1, top_p = top_p - # strategy=top_k -> nvext.top_k = top_k - # temperature -> temperature - # top_p -> top_p - # top_k -> nvext.top_k - # max_tokens -> max_tokens - # repetition_penalty -> nvext.repetition_penalty - # response_format -> GrammarResponseFormat TODO(mf) - # response_format -> JsonSchemaResponseFormat: response_format = "json_object" & nvext["guided_json"] = json_schema - # tools -> tools - # tool_choice ("auto", "required") -> tool_choice - # tool_prompt_format -> TBD - # stream -> stream - # logprobs -> logprobs - - if request.response_format and not isinstance(request.response_format, JsonSchemaResponseFormat): - raise ValueError( - f"Unsupported response format: {request.response_format}. Only JsonSchemaResponseFormat is supported." - ) - - nvext = {} - payload: dict[str, Any] = dict( - model=request.model, - messages=[await convert_message_to_openai_dict_new(message) for message in request.messages], - stream=request.stream, - n=n, - extra_body=dict(nvext=nvext), - extra_headers={ - b"User-Agent": b"llama-stack: nvidia-inference-adapter", - }, - ) - - if request.response_format: - # server bug - setting guided_json changes the behavior of response_format resulting in an error - # payload.update(response_format="json_object") - nvext.update(guided_json=request.response_format.json_schema) - - if request.tools: - payload.update(tools=[convert_tooldef_to_openai_tool(tool) for tool in request.tools]) - if request.tool_config.tool_choice: - payload.update( - tool_choice=request.tool_config.tool_choice.value - ) # we cannot include tool_choice w/o tools, server will complain - - if request.logprobs: - payload.update(logprobs=True) - payload.update(top_logprobs=request.logprobs.top_k) - - if request.sampling_params: - nvext.update(repetition_penalty=request.sampling_params.repetition_penalty) - - if request.sampling_params.max_tokens: - payload.update(max_tokens=request.sampling_params.max_tokens) - - strategy = request.sampling_params.strategy - if isinstance(strategy, TopPSamplingStrategy): - nvext.update(top_k=-1) - payload.update(top_p=strategy.top_p) - payload.update(temperature=strategy.temperature) - elif isinstance(strategy, TopKSamplingStrategy): - if strategy.top_k != -1 and strategy.top_k < 1: - warnings.warn("top_k must be -1 or >= 1", stacklevel=2) - nvext.update(top_k=strategy.top_k) - elif isinstance(strategy, GreedySamplingStrategy): - nvext.update(top_k=-1) - else: - raise ValueError(f"Unsupported sampling strategy: {strategy}") - - return payload - - -def convert_completion_request( - request: CompletionRequest, - n: int = 1, -) -> dict: - """ - Convert a ChatCompletionRequest to an OpenAI API-compatible dictionary. - """ - # model -> model - # prompt -> prompt - # sampling_params TODO(mattf): review strategy - # strategy=greedy -> nvext.top_k = -1, temperature = temperature - # strategy=top_p -> nvext.top_k = -1, top_p = top_p - # strategy=top_k -> nvext.top_k = top_k - # temperature -> temperature - # top_p -> top_p - # top_k -> nvext.top_k - # max_tokens -> max_tokens - # repetition_penalty -> nvext.repetition_penalty - # response_format -> nvext.guided_json - # stream -> stream - # logprobs.top_k -> logprobs - - nvext = {} - payload: dict[str, Any] = dict( - model=request.model, - prompt=request.content, - stream=request.stream, - extra_body=dict(nvext=nvext), - extra_headers={ - b"User-Agent": b"llama-stack: nvidia-inference-adapter", - }, - n=n, - ) - - if request.response_format: - # this is not openai compliant, it is a nim extension - nvext.update(guided_json=request.response_format.json_schema) - - if request.logprobs: - payload.update(logprobs=request.logprobs.top_k) - - if request.sampling_params: - nvext.update(repetition_penalty=request.sampling_params.repetition_penalty) - - if request.sampling_params.max_tokens: - payload.update(max_tokens=request.sampling_params.max_tokens) - - if request.sampling_params.strategy == "top_p": - nvext.update(top_k=-1) - payload.update(top_p=request.sampling_params.top_p) - elif request.sampling_params.strategy == "top_k": - if request.sampling_params.top_k != -1 and request.sampling_params.top_k < 1: - warnings.warn("top_k must be -1 or >= 1", stacklevel=2) - nvext.update(top_k=request.sampling_params.top_k) - elif request.sampling_params.strategy == "greedy": - nvext.update(top_k=-1) - payload.update(temperature=request.sampling_params.temperature) - - return payload - - -def _convert_openai_completion_logprobs( - logprobs: OpenAICompletionLogprobs | None, -) -> list[TokenLogProbs] | None: - """ - Convert an OpenAI CompletionLogprobs into a list of TokenLogProbs. - """ - if not logprobs: - return None - - return [TokenLogProbs(logprobs_by_token=logprobs) for logprobs in logprobs.top_logprobs] - - -def convert_openai_completion_choice( - choice: OpenAIChoice, -) -> CompletionResponse: - """ - Convert an OpenAI Completion Choice into a CompletionResponse. - """ - return CompletionResponse( - content=choice.text, - stop_reason=_convert_openai_finish_reason(choice.finish_reason), - logprobs=_convert_openai_completion_logprobs(choice.logprobs), - ) - - -async def convert_openai_completion_stream( - stream: AsyncStream[OpenAICompletion], -) -> AsyncGenerator[CompletionResponse, None]: - """ - Convert a stream of OpenAI Completions into a stream - of ChatCompletionResponseStreamChunks. - """ - async for chunk in stream: - choice = chunk.choices[0] - yield CompletionResponseStreamChunk( - delta=choice.text, - stop_reason=_convert_openai_finish_reason(choice.finish_reason), - logprobs=_convert_openai_completion_logprobs(choice.logprobs), - ) diff --git a/llama_stack/providers/remote/inference/nvidia/utils.py b/llama_stack/providers/remote/inference/nvidia/utils.py index b8431e859..46ee939d9 100644 --- a/llama_stack/providers/remote/inference/nvidia/utils.py +++ b/llama_stack/providers/remote/inference/nvidia/utils.py @@ -4,53 +4,8 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import httpx - -from llama_stack.log import get_logger - from . import NVIDIAConfig -logger = get_logger(name=__name__, category="inference::nvidia") - def _is_nvidia_hosted(config: NVIDIAConfig) -> bool: return "integrate.api.nvidia.com" in config.url - - -async def _get_health(url: str) -> tuple[bool, bool]: - """ - Query {url}/v1/health/{live,ready} to check if the server is running and ready - - Args: - url (str): URL of the server - - Returns: - Tuple[bool, bool]: (is_live, is_ready) - """ - async with httpx.AsyncClient() as client: - live = await client.get(f"{url}/v1/health/live") - ready = await client.get(f"{url}/v1/health/ready") - return live.status_code == 200, ready.status_code == 200 - - -async def check_health(config: NVIDIAConfig) -> None: - """ - Check if the server is running and ready - - Args: - url (str): URL of the server - - Raises: - RuntimeError: If the server is not running or ready - """ - if not _is_nvidia_hosted(config): - logger.info("Checking NVIDIA NIM health...") - try: - is_live, is_ready = await _get_health(config.url) - if not is_live: - raise ConnectionError("NVIDIA NIM is not running") - if not is_ready: - raise ConnectionError("NVIDIA NIM is not ready") - # TODO(mf): should we wait for the server to be ready? - except httpx.ConnectError as e: - raise ConnectionError(f"Failed to connect to NVIDIA NIM: {e}") from e diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index 2a5177f93..c179eba6c 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -296,15 +296,14 @@ class OpenAIVectorStoreMixin(ABC): async def shutdown(self) -> None: """Clean up mixin resources including background tasks.""" # Cancel any running file batch tasks gracefully - if hasattr(self, "_file_batch_tasks"): - tasks_to_cancel = list(self._file_batch_tasks.items()) - for _, task in tasks_to_cancel: - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + tasks_to_cancel = list(self._file_batch_tasks.items()) + for _, task in tasks_to_cancel: + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass @abstractmethod async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: diff --git a/llama_stack/providers/utils/memory/vector_store.py b/llama_stack/providers/utils/memory/vector_store.py index 857fbe910..c0534a875 100644 --- a/llama_stack/providers/utils/memory/vector_store.py +++ b/llama_stack/providers/utils/memory/vector_store.py @@ -20,7 +20,6 @@ from pydantic import BaseModel from llama_stack.apis.common.content_types import ( URL, InterleavedContent, - TextContentItem, ) from llama_stack.apis.tools import RAGDocument from llama_stack.apis.vector_dbs import VectorDB @@ -129,26 +128,6 @@ def content_from_data_and_mime_type(data: bytes | str, mime_type: str | None, en return "" -def concat_interleaved_content(content: list[InterleavedContent]) -> InterleavedContent: - """concatenate interleaved content into a single list. ensure that 'str's are converted to TextContentItem when in a list""" - - ret = [] - - def _process(c): - if isinstance(c, str): - ret.append(TextContentItem(text=c)) - elif isinstance(c, list): - for item in c: - _process(item) - else: - ret.append(c) - - for c in content: - _process(c) - - return ret - - async def content_from_doc(doc: RAGDocument) -> str: if isinstance(doc.content, URL): if doc.content.uri.startswith("data:"):