chore: remove dead code (#3729)

# What does this PR do?
Removing some dead code, found by vulture and checked by claude that
there are no references or imports for these


## Test Plan
CI
This commit is contained in:
slekkala1 2025-10-07 20:26:02 -07:00 committed by GitHub
parent b6e9f41041
commit 1ac320b7e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 8 additions and 359 deletions

View file

@ -11,19 +11,13 @@
# top-level folder for each specific model found within the models/ directory at
# the top-level of this source tree.
import json
import textwrap
from pathlib import Path
from pydantic import BaseModel, Field
from llama_stack.models.llama.datatypes import (
RawContent,
RawMediaItem,
RawMessage,
RawTextItem,
StopReason,
ToolCall,
ToolPromptFormat,
)
from llama_stack.models.llama.llama4.tokenizer import Tokenizer
@ -175,25 +169,6 @@ def llama3_1_builtin_code_interpreter_dialog(tool_prompt_format=ToolPromptFormat
return messages
def llama3_1_builtin_tool_call_with_image_dialog(
tool_prompt_format=ToolPromptFormat.json,
):
this_dir = Path(__file__).parent
with open(this_dir / "llama3/dog.jpg", "rb") as f:
img = f.read()
interface = LLama31Interface(tool_prompt_format)
messages = interface.system_messages(**system_message_builtin_tools_only())
messages += interface.user_message(content=[RawMediaItem(data=img), RawTextItem(text="What is this dog breed?")])
messages += interface.assistant_response_messages(
"Based on the description of the dog in the image, it appears to be a small breed dog, possibly a terrier mix",
StopReason.end_of_turn,
)
messages += interface.user_message("Search the web for some food recommendations for the indentified breed")
return messages
def llama3_1_custom_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json):
interface = LLama31Interface(tool_prompt_format)
@ -202,35 +177,6 @@ def llama3_1_custom_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json):
return messages
def llama3_1_e2e_tool_call_dialog(tool_prompt_format=ToolPromptFormat.json):
tool_response = json.dumps(["great song1", "awesome song2", "cool song3"])
interface = LLama31Interface(tool_prompt_format)
messages = interface.system_messages(**system_message_custom_tools_only())
messages += interface.user_message(content="Use tools to get latest trending songs")
messages.append(
RawMessage(
role="assistant",
content="",
stop_reason=StopReason.end_of_message,
tool_calls=[
ToolCall(
call_id="call_id",
tool_name="trending_songs",
arguments={"n": "10", "genre": "latest"},
)
],
),
)
messages.append(
RawMessage(
role="assistant",
content=tool_response,
)
)
return messages
def llama3_2_user_assistant_conversation():
return UseCase(
title="User and assistant conversation",

View file

@ -7,8 +7,6 @@
import copy
import json
import re
import secrets
import string
import uuid
import warnings
from collections.abc import AsyncGenerator
@ -84,11 +82,6 @@ from llama_stack.providers.utils.telemetry import tracing
from .persistence import AgentPersistence
from .safety import SafetyException, ShieldRunnerMixin
def make_random_string(length: int = 8):
return "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length))
TOOLS_ATTACHMENT_KEY_REGEX = re.compile(r"__tools_attachment__=(\{.*?\})")
MEMORY_QUERY_TOOL = "knowledge_search"
WEB_SEARCH_TOOL = "web_search"

View file

@ -8,8 +8,6 @@ import asyncio
import base64
import io
import mimetypes
import secrets
import string
from typing import Any
import httpx
@ -52,10 +50,6 @@ from .context_retriever import generate_rag_query
log = get_logger(name=__name__, category="tool_runtime")
def make_random_string(length: int = 8):
return "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(length))
async def raw_data_from_doc(doc: RAGDocument) -> tuple[bytes, str]:
"""Get raw binary data and mime type from a RAGDocument for file upload."""
if isinstance(doc.content, URL):

View file

@ -1,217 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import warnings
from collections.abc import AsyncGenerator
from typing import Any
from openai import AsyncStream
from openai.types.chat.chat_completion import (
Choice as OpenAIChoice,
)
from openai.types.completion import Completion as OpenAICompletion
from openai.types.completion_choice import Logprobs as OpenAICompletionLogprobs
from llama_stack.apis.inference import (
ChatCompletionRequest,
CompletionRequest,
CompletionResponse,
CompletionResponseStreamChunk,
GreedySamplingStrategy,
JsonSchemaResponseFormat,
TokenLogProbs,
TopKSamplingStrategy,
TopPSamplingStrategy,
)
from llama_stack.providers.utils.inference.openai_compat import (
_convert_openai_finish_reason,
convert_message_to_openai_dict_new,
convert_tooldef_to_openai_tool,
)
async def convert_chat_completion_request(
request: ChatCompletionRequest,
n: int = 1,
) -> dict:
"""
Convert a ChatCompletionRequest to an OpenAI API-compatible dictionary.
"""
# model -> model
# messages -> messages
# sampling_params TODO(mattf): review strategy
# strategy=greedy -> nvext.top_k = -1, temperature = temperature
# strategy=top_p -> nvext.top_k = -1, top_p = top_p
# strategy=top_k -> nvext.top_k = top_k
# temperature -> temperature
# top_p -> top_p
# top_k -> nvext.top_k
# max_tokens -> max_tokens
# repetition_penalty -> nvext.repetition_penalty
# response_format -> GrammarResponseFormat TODO(mf)
# response_format -> JsonSchemaResponseFormat: response_format = "json_object" & nvext["guided_json"] = json_schema
# tools -> tools
# tool_choice ("auto", "required") -> tool_choice
# tool_prompt_format -> TBD
# stream -> stream
# logprobs -> logprobs
if request.response_format and not isinstance(request.response_format, JsonSchemaResponseFormat):
raise ValueError(
f"Unsupported response format: {request.response_format}. Only JsonSchemaResponseFormat is supported."
)
nvext = {}
payload: dict[str, Any] = dict(
model=request.model,
messages=[await convert_message_to_openai_dict_new(message) for message in request.messages],
stream=request.stream,
n=n,
extra_body=dict(nvext=nvext),
extra_headers={
b"User-Agent": b"llama-stack: nvidia-inference-adapter",
},
)
if request.response_format:
# server bug - setting guided_json changes the behavior of response_format resulting in an error
# payload.update(response_format="json_object")
nvext.update(guided_json=request.response_format.json_schema)
if request.tools:
payload.update(tools=[convert_tooldef_to_openai_tool(tool) for tool in request.tools])
if request.tool_config.tool_choice:
payload.update(
tool_choice=request.tool_config.tool_choice.value
) # we cannot include tool_choice w/o tools, server will complain
if request.logprobs:
payload.update(logprobs=True)
payload.update(top_logprobs=request.logprobs.top_k)
if request.sampling_params:
nvext.update(repetition_penalty=request.sampling_params.repetition_penalty)
if request.sampling_params.max_tokens:
payload.update(max_tokens=request.sampling_params.max_tokens)
strategy = request.sampling_params.strategy
if isinstance(strategy, TopPSamplingStrategy):
nvext.update(top_k=-1)
payload.update(top_p=strategy.top_p)
payload.update(temperature=strategy.temperature)
elif isinstance(strategy, TopKSamplingStrategy):
if strategy.top_k != -1 and strategy.top_k < 1:
warnings.warn("top_k must be -1 or >= 1", stacklevel=2)
nvext.update(top_k=strategy.top_k)
elif isinstance(strategy, GreedySamplingStrategy):
nvext.update(top_k=-1)
else:
raise ValueError(f"Unsupported sampling strategy: {strategy}")
return payload
def convert_completion_request(
request: CompletionRequest,
n: int = 1,
) -> dict:
"""
Convert a ChatCompletionRequest to an OpenAI API-compatible dictionary.
"""
# model -> model
# prompt -> prompt
# sampling_params TODO(mattf): review strategy
# strategy=greedy -> nvext.top_k = -1, temperature = temperature
# strategy=top_p -> nvext.top_k = -1, top_p = top_p
# strategy=top_k -> nvext.top_k = top_k
# temperature -> temperature
# top_p -> top_p
# top_k -> nvext.top_k
# max_tokens -> max_tokens
# repetition_penalty -> nvext.repetition_penalty
# response_format -> nvext.guided_json
# stream -> stream
# logprobs.top_k -> logprobs
nvext = {}
payload: dict[str, Any] = dict(
model=request.model,
prompt=request.content,
stream=request.stream,
extra_body=dict(nvext=nvext),
extra_headers={
b"User-Agent": b"llama-stack: nvidia-inference-adapter",
},
n=n,
)
if request.response_format:
# this is not openai compliant, it is a nim extension
nvext.update(guided_json=request.response_format.json_schema)
if request.logprobs:
payload.update(logprobs=request.logprobs.top_k)
if request.sampling_params:
nvext.update(repetition_penalty=request.sampling_params.repetition_penalty)
if request.sampling_params.max_tokens:
payload.update(max_tokens=request.sampling_params.max_tokens)
if request.sampling_params.strategy == "top_p":
nvext.update(top_k=-1)
payload.update(top_p=request.sampling_params.top_p)
elif request.sampling_params.strategy == "top_k":
if request.sampling_params.top_k != -1 and request.sampling_params.top_k < 1:
warnings.warn("top_k must be -1 or >= 1", stacklevel=2)
nvext.update(top_k=request.sampling_params.top_k)
elif request.sampling_params.strategy == "greedy":
nvext.update(top_k=-1)
payload.update(temperature=request.sampling_params.temperature)
return payload
def _convert_openai_completion_logprobs(
logprobs: OpenAICompletionLogprobs | None,
) -> list[TokenLogProbs] | None:
"""
Convert an OpenAI CompletionLogprobs into a list of TokenLogProbs.
"""
if not logprobs:
return None
return [TokenLogProbs(logprobs_by_token=logprobs) for logprobs in logprobs.top_logprobs]
def convert_openai_completion_choice(
choice: OpenAIChoice,
) -> CompletionResponse:
"""
Convert an OpenAI Completion Choice into a CompletionResponse.
"""
return CompletionResponse(
content=choice.text,
stop_reason=_convert_openai_finish_reason(choice.finish_reason),
logprobs=_convert_openai_completion_logprobs(choice.logprobs),
)
async def convert_openai_completion_stream(
stream: AsyncStream[OpenAICompletion],
) -> AsyncGenerator[CompletionResponse, None]:
"""
Convert a stream of OpenAI Completions into a stream
of ChatCompletionResponseStreamChunks.
"""
async for chunk in stream:
choice = chunk.choices[0]
yield CompletionResponseStreamChunk(
delta=choice.text,
stop_reason=_convert_openai_finish_reason(choice.finish_reason),
logprobs=_convert_openai_completion_logprobs(choice.logprobs),
)

View file

@ -4,53 +4,8 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import httpx
from llama_stack.log import get_logger
from . import NVIDIAConfig
logger = get_logger(name=__name__, category="inference::nvidia")
def _is_nvidia_hosted(config: NVIDIAConfig) -> bool:
return "integrate.api.nvidia.com" in config.url
async def _get_health(url: str) -> tuple[bool, bool]:
"""
Query {url}/v1/health/{live,ready} to check if the server is running and ready
Args:
url (str): URL of the server
Returns:
Tuple[bool, bool]: (is_live, is_ready)
"""
async with httpx.AsyncClient() as client:
live = await client.get(f"{url}/v1/health/live")
ready = await client.get(f"{url}/v1/health/ready")
return live.status_code == 200, ready.status_code == 200
async def check_health(config: NVIDIAConfig) -> None:
"""
Check if the server is running and ready
Args:
url (str): URL of the server
Raises:
RuntimeError: If the server is not running or ready
"""
if not _is_nvidia_hosted(config):
logger.info("Checking NVIDIA NIM health...")
try:
is_live, is_ready = await _get_health(config.url)
if not is_live:
raise ConnectionError("NVIDIA NIM is not running")
if not is_ready:
raise ConnectionError("NVIDIA NIM is not ready")
# TODO(mf): should we wait for the server to be ready?
except httpx.ConnectError as e:
raise ConnectionError(f"Failed to connect to NVIDIA NIM: {e}") from e

View file

@ -296,15 +296,14 @@ class OpenAIVectorStoreMixin(ABC):
async def shutdown(self) -> None:
"""Clean up mixin resources including background tasks."""
# Cancel any running file batch tasks gracefully
if hasattr(self, "_file_batch_tasks"):
tasks_to_cancel = list(self._file_batch_tasks.items())
for _, task in tasks_to_cancel:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
tasks_to_cancel = list(self._file_batch_tasks.items())
for _, task in tasks_to_cancel:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@abstractmethod
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:

View file

@ -20,7 +20,6 @@ from pydantic import BaseModel
from llama_stack.apis.common.content_types import (
URL,
InterleavedContent,
TextContentItem,
)
from llama_stack.apis.tools import RAGDocument
from llama_stack.apis.vector_dbs import VectorDB
@ -129,26 +128,6 @@ def content_from_data_and_mime_type(data: bytes | str, mime_type: str | None, en
return ""
def concat_interleaved_content(content: list[InterleavedContent]) -> InterleavedContent:
"""concatenate interleaved content into a single list. ensure that 'str's are converted to TextContentItem when in a list"""
ret = []
def _process(c):
if isinstance(c, str):
ret.append(TextContentItem(text=c))
elif isinstance(c, list):
for item in c:
_process(item)
else:
ret.append(c)
for c in content:
_process(c)
return ret
async def content_from_doc(doc: RAGDocument) -> str:
if isinstance(doc.content, URL):
if doc.content.uri.startswith("data:"):