Merge branch 'main' into make-kvstore-optional

This commit is contained in:
Francisco Arceo 2025-08-05 14:10:30 -04:00 committed by GitHub
commit f62e6cb063
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
554 changed files with 63962 additions and 4870 deletions

View file

@ -65,6 +65,8 @@ class ModelsProtocolPrivate(Protocol):
class ShieldsProtocolPrivate(Protocol):
async def register_shield(self, shield: Shield) -> None: ...
async def unregister_shield(self, identifier: str) -> None: ...
class VectorDBsProtocolPrivate(Protocol):
async def register_vector_db(self, vector_db: VectorDB) -> None: ...

View file

@ -6,7 +6,7 @@
from typing import Any
from llama_stack.distribution.datatypes import AccessRule, Api
from llama_stack.core.datatypes import AccessRule, Api
from .config import MetaReferenceAgentsImplConfig

View file

@ -44,6 +44,7 @@ from llama_stack.apis.common.content_types import (
ToolCallDelta,
ToolCallParseStatus,
)
from llama_stack.apis.common.errors import SessionNotFoundError
from llama_stack.apis.inference import (
ChatCompletionResponseEventType,
CompletionMessage,
@ -61,7 +62,7 @@ from llama_stack.apis.inference import (
from llama_stack.apis.safety import Safety
from llama_stack.apis.tools import ToolGroups, ToolInvocationResult, ToolRuntime
from llama_stack.apis.vector_io import VectorIO
from llama_stack.distribution.datatypes import AccessRule
from llama_stack.core.datatypes import AccessRule
from llama_stack.log import get_logger
from llama_stack.models.llama.datatypes import (
BuiltinTool,
@ -214,7 +215,7 @@ class ChatAgent(ShieldRunnerMixin):
is_resume = isinstance(request, AgentTurnResumeRequest)
session_info = await self.storage.get_session_info(request.session_id)
if session_info is None:
raise ValueError(f"Session {request.session_id} not found")
raise SessionNotFoundError(request.session_id)
turns = await self.storage.get_session_turns(request.session_id)
if is_resume and len(turns) == 0:

View file

@ -41,7 +41,7 @@ from llama_stack.apis.inference import (
from llama_stack.apis.safety import Safety
from llama_stack.apis.tools import ToolGroups, ToolRuntime
from llama_stack.apis.vector_io import VectorIO
from llama_stack.distribution.datatypes import AccessRule
from llama_stack.core.datatypes import AccessRule
from llama_stack.providers.utils.kvstore import InmemoryKVStoreImpl, kvstore_impl
from llama_stack.providers.utils.pagination import paginate_records
from llama_stack.providers.utils.responses.responses_store import ResponsesStore
@ -230,8 +230,6 @@ class MetaReferenceAgentsImpl(Agents):
agent = await self._get_agent_impl(agent_id)
session_info = await agent.storage.get_session_info(session_id)
if session_info is None:
raise ValueError(f"Session {session_id} not found")
turns = await agent.storage.get_session_turns(session_id)
if turn_ids:
turns = [turn for turn in turns if turn.turn_id in turn_ids]
@ -244,9 +242,6 @@ class MetaReferenceAgentsImpl(Agents):
async def delete_agents_session(self, agent_id: str, session_id: str) -> None:
agent = await self._get_agent_impl(agent_id)
session_info = await agent.storage.get_session_info(session_id)
if session_info is None:
raise ValueError(f"Session {session_id} not found")
# Delete turns first, then the session
await agent.storage.delete_session_turns(session_id)

View file

@ -10,10 +10,11 @@ import uuid
from datetime import UTC, datetime
from llama_stack.apis.agents import AgentConfig, Session, ToolExecutionStep, Turn
from llama_stack.distribution.access_control.access_control import AccessDeniedError, is_action_allowed
from llama_stack.distribution.access_control.datatypes import AccessRule
from llama_stack.distribution.datatypes import User
from llama_stack.distribution.request_headers import get_authenticated_user
from llama_stack.apis.common.errors import SessionNotFoundError
from llama_stack.core.access_control.access_control import AccessDeniedError, is_action_allowed
from llama_stack.core.access_control.datatypes import AccessRule
from llama_stack.core.datatypes import User
from llama_stack.core.request_headers import get_authenticated_user
from llama_stack.providers.utils.kvstore import KVStore
log = logging.getLogger(__name__)
@ -61,12 +62,12 @@ class AgentPersistence:
)
return session_id
async def get_session_info(self, session_id: str) -> AgentSessionInfo | None:
async def get_session_info(self, session_id: str) -> AgentSessionInfo:
value = await self.kvstore.get(
key=f"session:{self.agent_id}:{session_id}",
)
if not value:
return None
raise SessionNotFoundError(session_id)
session_info = AgentSessionInfo(**json.loads(value))
@ -95,7 +96,7 @@ class AgentPersistence:
async def add_vector_db_to_session(self, session_id: str, vector_db_id: str):
session_info = await self.get_session_if_accessible(session_id)
if session_info is None:
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
session_info.vector_db_id = vector_db_id
await self.kvstore.set(
@ -105,7 +106,7 @@ class AgentPersistence:
async def add_turn_to_session(self, session_id: str, turn: Turn):
if not await self.get_session_if_accessible(session_id):
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
await self.kvstore.set(
key=f"session:{self.agent_id}:{session_id}:{turn.turn_id}",
@ -114,7 +115,7 @@ class AgentPersistence:
async def get_session_turns(self, session_id: str) -> list[Turn]:
if not await self.get_session_if_accessible(session_id):
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
values = await self.kvstore.values_in_range(
start_key=f"session:{self.agent_id}:{session_id}:",
@ -137,7 +138,7 @@ class AgentPersistence:
async def get_session_turn(self, session_id: str, turn_id: str) -> Turn | None:
if not await self.get_session_if_accessible(session_id):
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
value = await self.kvstore.get(
key=f"session:{self.agent_id}:{session_id}:{turn_id}",
@ -148,7 +149,7 @@ class AgentPersistence:
async def set_in_progress_tool_call_step(self, session_id: str, turn_id: str, step: ToolExecutionStep):
if not await self.get_session_if_accessible(session_id):
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
await self.kvstore.set(
key=f"in_progress_tool_call_step:{self.agent_id}:{session_id}:{turn_id}",
@ -166,7 +167,7 @@ class AgentPersistence:
async def set_num_infer_iters_in_turn(self, session_id: str, turn_id: str, num_infer_iters: int):
if not await self.get_session_if_accessible(session_id):
raise ValueError(f"Session {session_id} not found or access denied")
raise SessionNotFoundError(session_id)
await self.kvstore.set(
key=f"num_infer_iters_in_turn:{self.agent_id}:{session_id}:{turn_id}",
@ -218,6 +219,6 @@ class AgentPersistence:
"""
session_info = await self.get_session_info(session_id)
if session_info is None:
raise ValueError(f"Session {session_id} not found")
raise SessionNotFoundError(session_id)
await self.kvstore.delete(key=f"session:{self.agent_id}:{session_id}")

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import MetaReferenceEvalConfig

View file

@ -6,7 +6,7 @@
from typing import Any
from llama_stack.distribution.datatypes import AccessRule, Api
from llama_stack.core.datatypes import AccessRule, Api
from .config import LocalfsFilesImplConfig
from .files import LocalfsFilesImpl

View file

@ -19,7 +19,7 @@ from llama_stack.apis.files import (
OpenAIFileObject,
OpenAIFilePurpose,
)
from llama_stack.distribution.datatypes import AccessRule
from llama_stack.core.datatypes import AccessRule
from llama_stack.providers.utils.sqlstore.api import ColumnDefinition, ColumnType
from llama_stack.providers.utils.sqlstore.authorized_sqlstore import AuthorizedSqlStore
from llama_stack.providers.utils.sqlstore.sqlstore import sqlstore_impl

View file

@ -6,7 +6,7 @@
from pathlib import Path
from llama_stack.distribution.utils.model_utils import model_local_dir
from llama_stack.core.utils.model_utils import model_local_dir
def model_checkpoint_dir(model_id) -> str:

View file

@ -6,7 +6,7 @@
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import HuggingFacePostTrainingConfig

View file

@ -67,6 +67,12 @@ class HuggingFacePostTrainingConfig(BaseModel):
# Can improve data transfer speed to GPU but uses more memory
dataloader_pin_memory: bool = True
# DPO-specific parameters
dpo_beta: float = 0.1
use_reference_model: bool = True
dpo_loss_type: Literal["sigmoid", "hinge", "ipo", "kto_pair"] = "sigmoid"
dpo_output_dir: str = "./checkpoints/dpo"
@classmethod
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
return {"checkpoint_format": "huggingface", "distributed_backend": None, "device": "cpu"}

View file

@ -25,6 +25,9 @@ from llama_stack.providers.inline.post_training.huggingface.config import (
from llama_stack.providers.inline.post_training.huggingface.recipes.finetune_single_device import (
HFFinetuningSingleDevice,
)
from llama_stack.providers.inline.post_training.huggingface.recipes.finetune_single_device_dpo import (
HFDPOAlignmentSingleDevice,
)
from llama_stack.providers.utils.scheduler import JobArtifact, Scheduler
from llama_stack.providers.utils.scheduler import JobStatus as SchedulerJobStatus
from llama_stack.schema_utils import webmethod
@ -36,6 +39,7 @@ class TrainingArtifactType(Enum):
_JOB_TYPE_SUPERVISED_FINE_TUNE = "supervised-fine-tune"
_JOB_TYPE_DPO_TRAINING = "dpo-training"
class HuggingFacePostTrainingImpl:
@ -119,12 +123,37 @@ class HuggingFacePostTrainingImpl:
hyperparam_search_config: dict[str, Any],
logger_config: dict[str, Any],
) -> PostTrainingJob:
raise NotImplementedError("DPO alignment is not implemented yet")
async def handler(on_log_message_cb, on_status_change_cb, on_artifact_collected_cb):
on_log_message_cb("Starting HF DPO alignment")
async def get_training_jobs(self) -> ListPostTrainingJobsResponse:
return ListPostTrainingJobsResponse(
data=[PostTrainingJob(job_uuid=job.id) for job in self._scheduler.get_jobs()]
)
recipe = HFDPOAlignmentSingleDevice(
job_uuid=job_uuid,
datasetio_api=self.datasetio_api,
datasets_api=self.datasets_api,
)
resources_allocated, checkpoints = await recipe.train(
model=finetuned_model,
output_dir=f"{self.config.dpo_output_dir}/{job_uuid}",
job_uuid=job_uuid,
dpo_config=algorithm_config,
config=training_config,
provider_config=self.config,
)
on_artifact_collected_cb(self._resources_stats_to_artifact(resources_allocated))
if checkpoints:
for checkpoint in checkpoints:
artifact = self._checkpoint_to_artifact(checkpoint)
on_artifact_collected_cb(artifact)
else:
on_log_message_cb("Warning: No checkpoints were saved during DPO training")
on_status_change_cb(SchedulerJobStatus.completed)
on_log_message_cb("HF DPO alignment completed")
job_uuid = self._scheduler.schedule(_JOB_TYPE_DPO_TRAINING, job_uuid, handler)
return PostTrainingJob(job_uuid=job_uuid)
@staticmethod
def _get_artifacts_metadata_by_type(job, artifact_type):
@ -174,3 +203,9 @@ class HuggingFacePostTrainingImpl:
async def get_training_job_artifacts(self, job_uuid: str) -> PostTrainingJobArtifactsResponse | None:
job = self._scheduler.get_job(job_uuid)
return PostTrainingJobArtifactsResponse(job_uuid=job_uuid, checkpoints=self._get_checkpoints(job))
@webmethod(route="/post-training/jobs", method="GET")
async def get_training_jobs(self) -> ListPostTrainingJobsResponse:
return ListPostTrainingJobsResponse(
data=[PostTrainingJob(job_uuid=job.id) for job in self._scheduler.get_jobs()]
)

View file

@ -8,30 +8,13 @@ import gc
import json
import logging
import multiprocessing
import os
import signal
import sys
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import psutil
from llama_stack.providers.inline.post_training.common.utils import evacuate_model_from_device
# Set tokenizer parallelism environment variable
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Force PyTorch to use OpenBLAS instead of MKL
os.environ["MKL_THREADING_LAYER"] = "GNU"
os.environ["MKL_SERVICE_FORCE_INTEL"] = "0"
os.environ["MKL_NUM_THREADS"] = "1"
import torch
from datasets import Dataset
from peft import LoraConfig
from transformers import (
AutoConfig,
AutoModelForCausalLM,
AutoTokenizer,
)
@ -45,93 +28,25 @@ from llama_stack.apis.post_training import (
LoraFinetuningConfig,
TrainingConfig,
)
from llama_stack.providers.inline.post_training.common.utils import evacuate_model_from_device
from ..config import HuggingFacePostTrainingConfig
from ..utils import (
calculate_training_steps,
create_checkpoints,
get_memory_stats,
get_save_strategy,
load_model,
load_rows_from_dataset,
setup_environment,
setup_signal_handlers,
setup_torch_device,
split_dataset,
)
logger = logging.getLogger(__name__)
def get_gb(to_convert: int) -> str:
"""Converts memory stats to GB and formats to 2 decimal places.
Args:
to_convert: Memory value in bytes
Returns:
str: Memory value in GB formatted to 2 decimal places
"""
return f"{(to_convert / (1024**3)):.2f}"
def get_memory_stats(device: torch.device) -> dict[str, Any]:
"""Get memory statistics for the given device."""
stats = {
"system_memory": {
"total": get_gb(psutil.virtual_memory().total),
"available": get_gb(psutil.virtual_memory().available),
"used": get_gb(psutil.virtual_memory().used),
"percent": psutil.virtual_memory().percent,
}
}
if device.type == "cuda":
stats["device_memory"] = {
"allocated": get_gb(torch.cuda.memory_allocated(device)),
"reserved": get_gb(torch.cuda.memory_reserved(device)),
"max_allocated": get_gb(torch.cuda.max_memory_allocated(device)),
}
elif device.type == "mps":
# MPS doesn't provide direct memory stats, but we can track system memory
stats["device_memory"] = {
"note": "MPS memory stats not directly available",
"system_memory_used": get_gb(psutil.virtual_memory().used),
}
elif device.type == "cpu":
# For CPU, we track process memory usage
process = psutil.Process()
stats["device_memory"] = {
"process_rss": get_gb(process.memory_info().rss),
"process_vms": get_gb(process.memory_info().vms),
"process_percent": process.memory_percent(),
}
return stats
def setup_torch_device(device_str: str) -> torch.device:
"""Initialize and validate a PyTorch device.
This function handles device initialization and validation for different device types:
- CUDA: Validates CUDA availability and handles device selection
- MPS: Validates MPS availability for Apple Silicon
- CPU: Basic validation
- HPU: Raises error as it's not supported
Args:
device_str: String specifying the device ('cuda', 'cpu', 'mps')
Returns:
torch.device: The initialized and validated device
Raises:
RuntimeError: If device initialization fails or device is not supported
"""
try:
device = torch.device(device_str)
except RuntimeError as e:
raise RuntimeError(f"Error getting Torch Device {str(e)}") from e
# Validate device capabilities
if device.type == "cuda":
if not torch.cuda.is_available():
raise RuntimeError(
f"{device.type}: Torch has no CUDA/ROCm support or could not detect a compatible device."
)
if device.index is None:
device = torch.device(device.type, torch.cuda.current_device())
elif device.type == "mps":
if not torch.backends.mps.is_available():
raise RuntimeError(f"{device.type}: Torch has no MPS support or could not detect a compatible device.")
elif device.type == "hpu":
raise RuntimeError(f"{device.type}: training does not support Intel Gaudi.")
return device
class HFFinetuningSingleDevice:
def __init__(
self,
@ -262,19 +177,6 @@ class HFFinetuningSingleDevice:
remove_columns=ds.column_names,
)
async def _setup_data(self, dataset_id: str) -> list[dict[str, Any]]:
"""Load dataset from llama stack dataset provider"""
try:
all_rows = await self.datasetio_api.iterrows(
dataset_id=dataset_id,
limit=-1,
)
if not isinstance(all_rows.data, list):
raise RuntimeError("Expected dataset data to be a list")
return all_rows.data
except Exception as e:
raise RuntimeError(f"Failed to load dataset: {str(e)}") from e
def _run_training_sync(
self,
model: str,
@ -327,7 +229,7 @@ class HFFinetuningSingleDevice:
# Load dataset
logger.info(f"Loading dataset: {config.data_config.dataset_id}")
rows = await self._setup_data(config.data_config.dataset_id)
rows = await load_rows_from_dataset(self.datasetio_api, config.data_config.dataset_id)
if not self.validate_dataset_format(rows):
raise ValueError("Dataset is missing required fields: input_query, expected_answer, chat_completion_input")
logger.info(f"Loaded {len(rows)} rows from dataset")
@ -369,47 +271,10 @@ class HFFinetuningSingleDevice:
raise ValueError(f"Failed to create dataset: {str(e)}") from e
# Split dataset
logger.info("Splitting dataset into train and validation sets")
train_val_split = ds.train_test_split(test_size=0.1, seed=42)
train_dataset = train_val_split["train"]
eval_dataset = train_val_split["test"]
logger.info(f"Split dataset into {len(train_dataset)} training and {len(eval_dataset)} validation examples")
train_dataset, eval_dataset = split_dataset(ds)
return train_dataset, eval_dataset, tokenizer
def load_model(
self,
model: str,
device: torch.device,
provider_config: HuggingFacePostTrainingConfig,
) -> AutoModelForCausalLM:
"""Load and initialize the model for training.
Args:
model: The model identifier to load
device: The device to load the model onto
provider_config: Provider-specific configuration
Returns:
The loaded and initialized model
Raises:
RuntimeError: If model loading fails
"""
logger.info("Loading the base model")
try:
model_config = AutoConfig.from_pretrained(model, **provider_config.model_specific_config)
model_obj = AutoModelForCausalLM.from_pretrained(
model,
torch_dtype="auto" if device.type != "cpu" else "float32",
quantization_config=None,
config=model_config,
**provider_config.model_specific_config,
)
# Always move model to specified device
model_obj = model_obj.to(device)
logger.info(f"Model loaded and moved to device: {model_obj.device}")
return model_obj
except Exception as e:
raise RuntimeError(f"Failed to load model: {str(e)}") from e
def setup_training_args(
self,
config: TrainingConfig,
@ -439,27 +304,12 @@ class HFFinetuningSingleDevice:
raise ValueError("DataConfig is required for training")
data_config = config.data_config
# Calculate steps
total_steps = steps_per_epoch * config.n_epochs
max_steps = min(config.max_steps_per_epoch, total_steps)
logging_steps = max(1, steps_per_epoch // 50) # Log 50 times per epoch
logger.info("Training configuration:")
logger.info(f"- Steps per epoch: {steps_per_epoch}")
logger.info(f"- Total steps: {total_steps}")
logger.info(f"- Max steps: {max_steps}")
logger.info(f"- Logging steps: {logging_steps}")
# Configure save strategy
save_strategy = "no"
eval_strategy = "no"
if output_dir_path:
save_strategy = "epoch"
eval_strategy = "epoch"
logger.info(f"Will save checkpoints to {output_dir_path}")
# Calculate steps and get save strategy
step_info = calculate_training_steps(steps_per_epoch, config)
save_strategy, eval_strategy = get_save_strategy(output_dir_path)
return SFTConfig(
max_steps=max_steps,
max_steps=step_info["max_steps"],
output_dir=str(output_dir_path) if output_dir_path is not None else None,
num_train_epochs=config.n_epochs,
per_device_train_batch_size=data_config.batch_size,
@ -469,7 +319,7 @@ class HFFinetuningSingleDevice:
use_cpu=True if device.type == "cpu" and not torch.backends.mps.is_available() else False,
save_strategy=save_strategy,
report_to="none",
max_seq_length=provider_config.max_seq_length,
max_length=provider_config.max_seq_length,
gradient_accumulation_steps=config.gradient_accumulation_steps,
gradient_checkpointing=provider_config.gradient_checkpointing,
learning_rate=lr,
@ -483,7 +333,7 @@ class HFFinetuningSingleDevice:
load_best_model_at_end=True if output_dir_path else False,
metric_for_best_model="eval_loss",
greater_is_better=False,
logging_steps=logging_steps,
logging_steps=step_info["logging_steps"],
)
def save_model(
@ -523,13 +373,11 @@ class HFFinetuningSingleDevice:
) -> None:
"""Run the training process with signal handling."""
def signal_handler(signum, frame):
"""Handle termination signals gracefully."""
logger.info(f"Received signal {signum}, initiating graceful shutdown")
sys.exit(0)
# Setup environment variables
setup_environment()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Setup signal handlers
setup_signal_handlers()
# Convert config dicts back to objects
logger.info("Initializing configuration objects")
@ -558,7 +406,7 @@ class HFFinetuningSingleDevice:
)
# Load model
model_obj = self.load_model(model, device, provider_config_obj)
model_obj = load_model(model, device, provider_config_obj)
# Initialize trainer
logger.info("Initializing SFTTrainer")
@ -633,7 +481,7 @@ class HFFinetuningSingleDevice:
# Train in a separate process
logger.info("Starting training in separate process")
try:
# Set multiprocessing start method to 'spawn' for CUDA/MPS compatibility
# Setup multiprocessing for device
if device.type in ["cuda", "mps"]:
multiprocessing.set_start_method("spawn", force=True)
@ -663,37 +511,7 @@ class HFFinetuningSingleDevice:
checkpoints = []
if output_dir_path:
# Get all checkpoint directories and sort them numerically
checkpoint_dirs = sorted(
[d for d in output_dir_path.glob("checkpoint-*") if d.is_dir()],
key=lambda x: int(x.name.split("-")[1]),
)
# Add all checkpoint directories
for epoch_number, checkpoint_dir in enumerate(checkpoint_dirs, start=1):
# Get the creation time of the directory
created_time = datetime.fromtimestamp(os.path.getctime(checkpoint_dir), tz=UTC)
checkpoint = Checkpoint(
identifier=checkpoint_dir.name,
created_at=created_time,
epoch=epoch_number,
post_training_job_id=job_uuid,
path=str(checkpoint_dir),
)
checkpoints.append(checkpoint)
# Add the merged model as a checkpoint
merged_model_path = output_dir_path / "merged_model"
if merged_model_path.exists():
checkpoint = Checkpoint(
identifier=f"{model}-sft-{config.n_epochs}",
created_at=datetime.now(UTC),
epoch=config.n_epochs,
post_training_job_id=job_uuid,
path=str(merged_model_path),
)
checkpoints.append(checkpoint)
checkpoints = create_checkpoints(output_dir_path, job_uuid, model, config, "merged_model")
return memory_stats, checkpoints if checkpoints else None
finally:

View file

@ -0,0 +1,485 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import gc
import logging
import multiprocessing
from pathlib import Path
from typing import Any
import torch
from datasets import Dataset
from transformers import (
AutoTokenizer,
)
from trl import DPOConfig, DPOTrainer
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.datasets import Datasets
from llama_stack.apis.post_training import (
Checkpoint,
DPOAlignmentConfig,
TrainingConfig,
)
from llama_stack.providers.inline.post_training.common.utils import evacuate_model_from_device
from ..config import HuggingFacePostTrainingConfig
from ..utils import (
calculate_training_steps,
create_checkpoints,
get_memory_stats,
get_save_strategy,
load_model,
load_rows_from_dataset,
setup_environment,
setup_signal_handlers,
setup_torch_device,
split_dataset,
)
logger = logging.getLogger(__name__)
class HFDPOAlignmentSingleDevice:
def __init__(
self,
job_uuid: str,
datasetio_api: DatasetIO,
datasets_api: Datasets,
):
self.datasetio_api = datasetio_api
self.datasets_api = datasets_api
self.job_uuid = job_uuid
def validate_dataset_format(self, rows: list[dict]) -> None:
"""Validate that the dataset has the required fields for DPO training."""
required_fields = ["prompt", "chosen", "rejected"]
if not rows:
logger.warning("Dataset is empty")
raise ValueError("Dataset is empty")
for i, row in enumerate(rows):
if not isinstance(row, dict):
logger.warning(f"Row {i} is not a dictionary")
raise ValueError(f"Row {i} is not a dictionary")
for field in required_fields:
if field not in row:
logger.warning(f"Row {i} missing required DPO field: {field}")
raise ValueError(f"Row {i} missing required DPO field: {field}")
# Handle both string and list formats
if field == "prompt":
# Prompt should be a string
if not isinstance(row[field], str):
logger.warning(f"Row {i} field '{field}' is not a string")
raise ValueError(f"Row {i} field '{field}' is not a string")
if not row[field].strip():
logger.warning(f"Row {i} field '{field}' is empty")
raise ValueError(f"Row {i} field '{field}' is empty")
else:
# chosen/rejected can be either strings or lists of messages
if isinstance(row[field], str):
if not row[field].strip():
logger.warning(f"Row {i} field '{field}' is empty")
raise ValueError(f"Row {i} field '{field}' is empty")
elif isinstance(row[field], list):
if not row[field]:
logger.warning(f"Row {i} field '{field}' is empty list")
raise ValueError(f"Row {i} field '{field}' is empty list")
else:
logger.warning(f"Row {i} field '{field}' is neither string nor list")
raise ValueError(f"Row {i} field '{field}' is neither string nor list")
logger.info(f"DPO dataset validation passed: {len(rows)} preference examples")
def _process_dpo_format(self, row: dict) -> tuple[str | None, str | None, str | None]:
"""Process a row in DPO format, handling both string and conversation list formats."""
if all(field in row for field in ["prompt", "chosen", "rejected"]):
prompt = row["prompt"]
# Handle chosen field - convert list to string if needed
if isinstance(row["chosen"], list):
# For conversation format, concatenate messages
chosen = "\n".join(
[msg.get("content", "") if isinstance(msg, dict) else str(msg) for msg in row["chosen"]]
)
else:
chosen = row["chosen"]
# Handle rejected field - convert list to string if needed
if isinstance(row["rejected"], list):
# For conversation format, concatenate messages
rejected = "\n".join(
[msg.get("content", "") if isinstance(msg, dict) else str(msg) for msg in row["rejected"]]
)
else:
rejected = row["rejected"]
return prompt, chosen, rejected
return None, None, None
def _format_text_for_dpo(self, prompt: str, response: str, provider_config: HuggingFacePostTrainingConfig) -> str:
"""Format prompt and response text based on model requirements."""
if hasattr(provider_config, "chat_template") and provider_config.chat_template:
# Use the chat template, supporting both {prompt}/{response} and {input}/{output}
template = provider_config.chat_template
# Try prompt/response first (DPO style)
if "{prompt}" in template and "{response}" in template:
return template.format(prompt=prompt, response=response)
# Fall back to input/output (SFT style)
elif "{input}" in template and "{output}" in template:
return template.format(input=prompt, output=response)
else:
# If template doesn't have expected placeholders, use default
return f"{prompt}\n{response}"
return f"{prompt}\n{response}"
def _create_dataset(
self, rows: list[dict], config: TrainingConfig, provider_config: HuggingFacePostTrainingConfig
) -> Dataset:
"""Create and preprocess the dataset for DPO."""
dpo_examples = []
for row in rows:
prompt, chosen, rejected = self._process_dpo_format(row)
if prompt and chosen and rejected:
# Format the texts
chosen_formatted = self._format_text_for_dpo(prompt, chosen, provider_config)
rejected_formatted = self._format_text_for_dpo(prompt, rejected, provider_config)
dpo_examples.append(
{
"prompt": prompt,
"chosen": chosen_formatted,
"rejected": rejected_formatted,
}
)
if not dpo_examples:
raise ValueError("No valid preference examples found in dataset")
logger.info(f"Created DPO dataset with {len(dpo_examples)} preference pairs")
return Dataset.from_list(dpo_examples)
def _preprocess_dataset(
self, ds: Dataset, tokenizer: AutoTokenizer, provider_config: HuggingFacePostTrainingConfig
) -> Dataset:
"""Preprocess the dataset with tokenizer for DPO."""
# DPOTrainer expects raw text, so we don't tokenize here
# Just return the dataset as is
return ds
def _run_training_sync(
self,
model: str,
provider_config: dict[str, Any],
dpo_config: dict[str, Any],
config: dict[str, Any],
output_dir_path: Path | None,
) -> None:
"""Synchronous wrapper for running DPO training process."""
import asyncio
logger.info("Starting DPO training process with async wrapper")
asyncio.run(
self._run_training(
model=model,
provider_config=provider_config,
dpo_config=dpo_config,
config=config,
output_dir_path=output_dir_path,
)
)
async def load_dataset(
self,
model: str,
config: TrainingConfig,
provider_config: HuggingFacePostTrainingConfig,
) -> tuple[Dataset, Dataset, AutoTokenizer]:
"""Load and prepare the dataset for DPO training."""
# Validate data config
if not config.data_config:
raise ValueError("DataConfig is required for DPO training")
# Load dataset
logger.info(f"Loading dataset: {config.data_config.dataset_id}")
rows = await load_rows_from_dataset(self.datasetio_api, config.data_config.dataset_id)
self.validate_dataset_format(rows)
logger.info(f"Loaded {len(rows)} rows from dataset")
# Initialize tokenizer
logger.info(f"Initializing tokenizer for model: {model}")
try:
tokenizer = AutoTokenizer.from_pretrained(model, **provider_config.model_specific_config)
# Set pad token to eos token if not present
if not tokenizer.pad_token:
tokenizer.pad_token = tokenizer.eos_token
# Set padding side to left for DPO
tokenizer.padding_side = "left"
# Set truncation side to right to keep the beginning of the sequence
tokenizer.truncation_side = "right"
# Set model max length to match provider config
tokenizer.model_max_length = provider_config.max_seq_length
logger.info("Tokenizer initialized successfully for DPO")
except Exception as e:
raise RuntimeError(f"Failed to initialize tokenizer: {str(e)}") from e
# Create and preprocess dataset
logger.info("Creating and preprocessing dataset for DPO")
try:
ds = self._create_dataset(rows, config, provider_config)
ds = self._preprocess_dataset(ds, tokenizer, provider_config)
logger.info(f"Dataset created with {len(ds)} examples")
except Exception as e:
raise ValueError(f"Failed to create dataset: {str(e)}") from e
# Split dataset
train_dataset, eval_dataset = split_dataset(ds)
return train_dataset, eval_dataset, tokenizer
def setup_training_args(
self,
config: TrainingConfig,
provider_config: HuggingFacePostTrainingConfig,
dpo_config: DPOAlignmentConfig,
device: torch.device,
output_dir_path: Path | None,
steps_per_epoch: int,
) -> DPOConfig:
"""Setup DPO training arguments."""
logger.info("Configuring DPO training arguments")
lr = 5e-7 # Lower learning rate for DPO
if config.optimizer_config:
lr = config.optimizer_config.lr
logger.info(f"Using custom learning rate: {lr}")
# Validate data config
if not config.data_config:
raise ValueError("DataConfig is required for training")
data_config = config.data_config
# Calculate steps and get save strategy
step_info = calculate_training_steps(steps_per_epoch, config)
save_strategy, eval_strategy = get_save_strategy(output_dir_path)
logger.info("DPO training configuration:")
logger.info(f"- DPO beta: {dpo_config.beta}")
logger.info(f"- DPO loss type: {provider_config.dpo_loss_type}")
# Calculate max prompt length as half of max sequence length
max_prompt_length = provider_config.max_seq_length // 2
return DPOConfig(
max_steps=step_info["max_steps"],
output_dir=str(output_dir_path) if output_dir_path is not None else None,
num_train_epochs=config.n_epochs,
per_device_train_batch_size=data_config.batch_size,
fp16=device.type == "cuda",
bf16=False, # Causes CPU issues.
eval_strategy=eval_strategy,
use_cpu=True if device.type == "cpu" and not torch.backends.mps.is_available() else False,
save_strategy=save_strategy,
report_to="none",
max_length=provider_config.max_seq_length,
max_prompt_length=max_prompt_length,
gradient_accumulation_steps=config.gradient_accumulation_steps,
gradient_checkpointing=provider_config.gradient_checkpointing,
learning_rate=lr,
warmup_ratio=provider_config.warmup_ratio,
weight_decay=provider_config.weight_decay,
remove_unused_columns=False,
dataloader_pin_memory=provider_config.dataloader_pin_memory,
dataloader_num_workers=provider_config.dataloader_num_workers,
load_best_model_at_end=True if output_dir_path else False,
metric_for_best_model="eval_loss",
greater_is_better=False,
logging_steps=step_info["logging_steps"],
save_total_limit=provider_config.save_total_limit,
# DPO specific parameters
beta=dpo_config.beta,
loss_type=provider_config.dpo_loss_type,
)
def save_model(
self,
trainer: DPOTrainer,
output_dir_path: Path,
) -> None:
"""Save the trained DPO model."""
logger.info("Saving final DPO model")
save_path = output_dir_path / "dpo_model"
logger.info(f"Saving model to {save_path}")
# Save model and tokenizer
trainer.save_model(str(save_path))
async def _run_training(
self,
model: str,
provider_config: dict[str, Any],
dpo_config: dict[str, Any],
config: dict[str, Any],
output_dir_path: Path | None,
) -> None:
"""Run the DPO training process with signal handling."""
# Setup environment variables
setup_environment()
# Setup signal handlers
setup_signal_handlers()
# Convert config dicts back to objects
logger.info("Initializing configuration objects")
provider_config_obj = HuggingFacePostTrainingConfig(**provider_config)
config_obj = TrainingConfig(**config)
dpo_config_obj = DPOAlignmentConfig(**dpo_config)
# Initialize and validate device
device = setup_torch_device(provider_config_obj.device)
logger.info(f"Using device '{device}'")
# Load dataset and tokenizer
train_dataset, eval_dataset, tokenizer = await self.load_dataset(model, config_obj, provider_config_obj)
# Calculate steps per epoch
if not config_obj.data_config:
raise ValueError("DataConfig is required for training")
steps_per_epoch = len(train_dataset) // config_obj.data_config.batch_size
# Setup training arguments
training_args = self.setup_training_args(
config_obj,
provider_config_obj,
dpo_config_obj,
device,
output_dir_path,
steps_per_epoch,
)
# Load model and reference model
model_obj = load_model(model, device, provider_config_obj)
ref_model = None
if provider_config_obj.use_reference_model:
logger.info("Loading separate reference model for DPO")
ref_model = load_model(model, device, provider_config_obj)
else:
logger.info("Using shared reference model for DPO")
# Initialize DPO trainer
logger.info("Initializing DPOTrainer")
trainer = DPOTrainer(
model=model_obj,
ref_model=ref_model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
)
try:
# Train
logger.info("Starting DPO training")
trainer.train()
logger.info("DPO training completed successfully")
# Save final model if output directory is provided
if output_dir_path:
logger.info(f"Saving model to output directory: {output_dir_path}")
self.save_model(trainer, output_dir_path)
logger.info("Model save completed")
finally:
# Clean up resources
logger.info("Cleaning up resources")
if hasattr(trainer, "model"):
evacuate_model_from_device(trainer.model, device.type)
if ref_model:
evacuate_model_from_device(ref_model, device.type)
del trainer
del ref_model
gc.collect()
logger.info("Cleanup completed")
logger.info("DPO training process finishing successfully")
async def train(
self,
model: str,
output_dir: str | None,
job_uuid: str,
dpo_config: DPOAlignmentConfig,
config: TrainingConfig,
provider_config: HuggingFacePostTrainingConfig,
) -> tuple[dict[str, Any], list[Checkpoint] | None]:
"""Train a model using HuggingFace's DPOTrainer"""
# Initialize and validate device
device = setup_torch_device(provider_config.device)
logger.info(f"Using device '{device}'")
output_dir_path = None
if output_dir:
output_dir_path = Path(output_dir)
# Track memory stats
memory_stats = {
"initial": get_memory_stats(device),
"after_training": None,
"final": None,
}
# Validate data config
if not config.data_config:
raise ValueError("DataConfig is required for training")
# Train in a separate process
logger.info("Starting DPO training in separate process")
try:
# Setup multiprocessing for device
if device.type in ["cuda", "mps"]:
multiprocessing.set_start_method("spawn", force=True)
process = multiprocessing.Process(
target=self._run_training_sync,
kwargs={
"model": model,
"provider_config": provider_config.model_dump(),
"dpo_config": dpo_config.model_dump(),
"config": config.model_dump(),
"output_dir_path": output_dir_path,
},
)
process.start()
# Monitor the process
while process.is_alive():
process.join(timeout=1) # Check every second
if not process.is_alive():
break
# Get the return code
if process.exitcode != 0:
raise RuntimeError(f"DPO training failed with exit code {process.exitcode}")
memory_stats["after_training"] = get_memory_stats(device)
checkpoints = []
if output_dir_path:
checkpoints = create_checkpoints(output_dir_path, job_uuid, model, config, "dpo_model")
return memory_stats, checkpoints if checkpoints else None
finally:
memory_stats["final"] = get_memory_stats(device)
gc.collect()

View file

@ -0,0 +1,269 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import logging
import os
import signal
import sys
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import psutil
import torch
from datasets import Dataset
from transformers import AutoConfig, AutoModelForCausalLM
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.post_training import Checkpoint, TrainingConfig
from .config import HuggingFacePostTrainingConfig
logger = logging.getLogger(__name__)
def setup_environment():
"""Setup common environment variables for training."""
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["MKL_THREADING_LAYER"] = "GNU"
os.environ["MKL_SERVICE_FORCE_INTEL"] = "0"
os.environ["MKL_NUM_THREADS"] = "1"
def bytes_to_gb(to_convert: int) -> str:
"""Converts memory stats to GB and formats to 2 decimal places.
Args:
to_convert: Memory value in bytes
Returns:
str: Memory value in GB formatted to 2 decimal places
"""
return f"{(to_convert / (1024**3)):.2f}"
def get_memory_stats(device: torch.device) -> dict[str, Any]:
"""Get memory statistics for the given device."""
stats = {
"system_memory": {
"total": bytes_to_gb(psutil.virtual_memory().total),
"available": bytes_to_gb(psutil.virtual_memory().available),
"used": bytes_to_gb(psutil.virtual_memory().used),
"percent": psutil.virtual_memory().percent,
}
}
if device.type == "cuda":
stats["device_memory"] = {
"allocated": bytes_to_gb(torch.cuda.memory_allocated(device)),
"reserved": bytes_to_gb(torch.cuda.memory_reserved(device)),
"max_allocated": bytes_to_gb(torch.cuda.max_memory_allocated(device)),
}
elif device.type == "mps":
# MPS doesn't provide direct memory stats, but we can track system memory
stats["device_memory"] = {
"note": "MPS memory stats not directly available",
"system_memory_used": bytes_to_gb(psutil.virtual_memory().used),
}
elif device.type == "cpu":
# For CPU, we track process memory usage
process = psutil.Process()
stats["device_memory"] = {
"process_rss": bytes_to_gb(process.memory_info().rss),
"process_vms": bytes_to_gb(process.memory_info().vms),
"process_percent": process.memory_percent(),
}
return stats
def setup_torch_device(device_str: str) -> torch.device:
"""Initialize and validate a PyTorch device.
This function handles device initialization and validation for different device types:
- CUDA: Validates CUDA availability and handles device selection
- MPS: Validates MPS availability for Apple Silicon
- CPU: Basic validation
- HPU: Raises error as it's not supported
Args:
device_str: String specifying the device ('cuda', 'cpu', 'mps')
Returns:
torch.device: The initialized and validated device
Raises:
RuntimeError: If device initialization fails or device is not supported
"""
try:
device = torch.device(device_str)
except RuntimeError as e:
raise RuntimeError(f"Error getting Torch Device {str(e)}") from e
# Validate device capabilities
if device.type == "cuda":
if not torch.cuda.is_available():
raise RuntimeError(
f"{device.type}: Torch has no CUDA/ROCm support or could not detect a compatible device."
)
if device.index is None:
device = torch.device(device.type, torch.cuda.current_device())
elif device.type == "mps":
if not torch.backends.mps.is_available():
raise RuntimeError(f"{device.type}: Torch has no MPS support or could not detect a compatible device.")
elif device.type == "hpu":
raise RuntimeError(f"{device.type}: training does not support Intel Gaudi.")
return device
async def load_rows_from_dataset(datasetio_api: DatasetIO, dataset_id: str) -> list[dict[str, Any]]:
"""Load dataset from llama stack dataset provider"""
try:
all_rows = await datasetio_api.iterrows(
dataset_id=dataset_id,
limit=-1,
)
if not isinstance(all_rows.data, list):
raise RuntimeError("Expected dataset data to be a list")
return all_rows.data
except Exception as e:
raise RuntimeError(f"Failed to load dataset: {str(e)}") from e
def load_model(
model: str,
device: torch.device,
provider_config: HuggingFacePostTrainingConfig,
) -> AutoModelForCausalLM:
"""Load and initialize the model for training.
Args:
model: The model identifier to load
device: The device to load the model onto
provider_config: Provider-specific configuration
Returns:
The loaded and initialized model
Raises:
RuntimeError: If model loading fails
"""
logger.info("Loading the base model")
try:
model_config = AutoConfig.from_pretrained(model, **provider_config.model_specific_config)
model_obj = AutoModelForCausalLM.from_pretrained(
model,
torch_dtype="auto" if device.type != "cpu" else "float32",
quantization_config=None,
config=model_config,
**provider_config.model_specific_config,
)
# Always move model to specified device
model_obj = model_obj.to(device)
logger.info(f"Model loaded and moved to device: {model_obj.device}")
return model_obj
except Exception as e:
raise RuntimeError(f"Failed to load model: {str(e)}") from e
def split_dataset(ds: Dataset) -> tuple[Dataset, Dataset]:
"""Split dataset into train and validation sets.
Args:
ds: Dataset to split
Returns:
tuple: (train_dataset, eval_dataset)
"""
logger.info("Splitting dataset into train and validation sets")
train_val_split = ds.train_test_split(test_size=0.1, seed=42)
train_dataset = train_val_split["train"]
eval_dataset = train_val_split["test"]
logger.info(f"Split dataset into {len(train_dataset)} training and {len(eval_dataset)} validation examples")
return train_dataset, eval_dataset
def setup_signal_handlers():
"""Setup signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, initiating graceful shutdown")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def calculate_training_steps(steps_per_epoch: int, config: TrainingConfig) -> dict[str, int]:
"""Calculate training steps and logging configuration.
Args:
steps_per_epoch: Number of training steps per epoch
config: Training configuration
Returns:
dict: Dictionary with calculated step values
"""
total_steps = steps_per_epoch * config.n_epochs
max_steps = min(config.max_steps_per_epoch, total_steps)
logging_steps = max(1, steps_per_epoch // 50) # Log 50 times per epoch
logger.info("Training configuration:")
logger.info(f"- Steps per epoch: {steps_per_epoch}")
logger.info(f"- Total steps: {total_steps}")
logger.info(f"- Max steps: {max_steps}")
logger.info(f"- Logging steps: {logging_steps}")
return {"total_steps": total_steps, "max_steps": max_steps, "logging_steps": logging_steps}
def get_save_strategy(output_dir_path: Path | None) -> tuple[str, str]:
"""Get save and evaluation strategy based on output directory.
Args:
output_dir_path: Optional path to save the model
Returns:
tuple: (save_strategy, eval_strategy)
"""
if output_dir_path:
logger.info(f"Will save checkpoints to {output_dir_path}")
return "epoch", "epoch"
return "no", "no"
def create_checkpoints(
output_dir_path: Path, job_uuid: str, model: str, config: TrainingConfig, final_model_name: str
) -> list[Checkpoint]:
"""Create checkpoint objects from training output.
Args:
output_dir_path: Path to the training output directory
job_uuid: Unique identifier for the training job
model: Model identifier
config: Training configuration
final_model_name: Name of the final model directory ("merged_model" for SFT, "dpo_model" for DPO)
Returns:
List of Checkpoint objects
"""
checkpoints = []
# Add checkpoint directories
checkpoint_dirs = sorted(
[d for d in output_dir_path.glob("checkpoint-*") if d.is_dir()],
key=lambda x: int(x.name.split("-")[1]),
)
for epoch_number, checkpoint_dir in enumerate(checkpoint_dirs, start=1):
created_time = datetime.fromtimestamp(os.path.getctime(checkpoint_dir), tz=UTC)
checkpoint = Checkpoint(
identifier=checkpoint_dir.name,
created_at=created_time,
epoch=epoch_number,
post_training_job_id=job_uuid,
path=str(checkpoint_dir),
)
checkpoints.append(checkpoint)
# Add final model
final_model_path = output_dir_path / final_model_name
if final_model_path.exists():
training_type = "sft" if final_model_name == "merged_model" else "dpo"
checkpoint = Checkpoint(
identifier=f"{model}-{training_type}-{config.n_epochs}",
created_at=datetime.now(UTC),
epoch=config.n_epochs,
post_training_job_id=job_uuid,
path=str(final_model_path),
)
checkpoints.append(checkpoint)
return checkpoints

View file

@ -6,7 +6,7 @@
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import TorchtunePostTrainingConfig

View file

@ -43,8 +43,8 @@ from llama_stack.apis.post_training import (
QATFinetuningConfig,
TrainingConfig,
)
from llama_stack.distribution.utils.config_dirs import DEFAULT_CHECKPOINT_DIR
from llama_stack.distribution.utils.model_utils import model_local_dir
from llama_stack.core.utils.config_dirs import DEFAULT_CHECKPOINT_DIR
from llama_stack.core.utils.model_utils import model_local_dir
from llama_stack.models.llama.sku_list import resolve_model
from llama_stack.providers.inline.post_training.common.utils import evacuate_model_from_device
from llama_stack.providers.inline.post_training.torchtune.common import utils

View file

@ -21,7 +21,7 @@ from llama_stack.apis.safety import (
ViolationLevel,
)
from llama_stack.apis.shields import Shield
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from llama_stack.models.llama.datatypes import Role
from llama_stack.models.llama.sku_types import CoreModelId
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
@ -150,6 +150,11 @@ class LlamaGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
if not model_id:
raise ValueError("Llama Guard shield must have a model id")
async def unregister_shield(self, identifier: str) -> None:
# LlamaGuard doesn't need to do anything special for unregistration
# The routing table handles the removal from the registry
pass
async def run_shield(
self,
shield_id: str,

View file

@ -18,7 +18,7 @@ from llama_stack.apis.safety import (
ViolationLevel,
)
from llama_stack.apis.shields import Shield
from llama_stack.distribution.utils.model_utils import model_local_dir
from llama_stack.core.utils.model_utils import model_local_dir
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import (
interleaved_content_as_str,
@ -46,6 +46,9 @@ class PromptGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
if shield.provider_resource_id != PROMPT_GUARD_MODEL:
raise ValueError(f"Only {PROMPT_GUARD_MODEL} is supported for Prompt Guard. ")
async def unregister_shield(self, identifier: str) -> None:
pass
async def run_shield(
self,
shield_id: str,

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import BasicScoringConfig

View file

@ -14,7 +14,7 @@ from llama_stack.apis.scoring import (
ScoringResult,
)
from llama_stack.apis.scoring_functions import ScoringFn, ScoringFnParams
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from llama_stack.providers.datatypes import ScoringFunctionsProtocolPrivate
from llama_stack.providers.utils.common.data_schema_validator import (
get_valid_schemas,

View file

@ -7,7 +7,7 @@ from typing import Any
from pydantic import BaseModel
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import BraintrustScoringConfig

View file

@ -29,8 +29,8 @@ from llama_stack.apis.scoring import (
ScoringResultRow,
)
from llama_stack.apis.scoring_functions import ScoringFn, ScoringFnParams
from llama_stack.distribution.datatypes import Api
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.datatypes import Api
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ScoringFunctionsProtocolPrivate
from llama_stack.providers.utils.common.data_schema_validator import (
get_valid_schemas,

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import LlmAsJudgeScoringConfig

View file

@ -15,7 +15,7 @@ from llama_stack.apis.scoring import (
ScoringResult,
)
from llama_stack.apis.scoring_functions import ScoringFn, ScoringFnParams
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from llama_stack.providers.datatypes import ScoringFunctionsProtocolPrivate
from llama_stack.providers.utils.common.data_schema_validator import (
get_valid_schemas,

View file

@ -6,7 +6,7 @@
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import TelemetryConfig, TelemetrySink

View file

@ -9,7 +9,7 @@ from typing import Any
from pydantic import BaseModel, Field, field_validator
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
class TelemetrySink(StrEnum):

View file

@ -36,7 +36,7 @@ from llama_stack.apis.telemetry import (
Trace,
UnstructuredLogEvent,
)
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
ConsoleSpanProcessor,
)

View file

@ -15,6 +15,7 @@ import faiss
import numpy as np
from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
@ -159,8 +160,11 @@ class FaissIndex(EmbeddingIndex):
for d, i in zip(distances[0], indices[0], strict=False):
if i < 0:
continue
score = 1.0 / float(d) if d != 0 else float("inf")
if score < score_threshold:
continue
chunks.append(self.chunk_by_index[int(i)])
scores.append(1.0 / float(d) if d != 0 else float("inf"))
scores.append(score)
return QueryChunksResponse(chunks=chunks, scores=scores)
@ -285,7 +289,7 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
) -> QueryChunksResponse:
index = self.cache.get(vector_db_id)
if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)

View file

@ -4,14 +4,18 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.datatypes import Api, ProviderSpec
from typing import Any
from llama_stack.providers.datatypes import Api
from .config import QdrantVectorIOConfig
async def get_adapter_impl(config: QdrantVectorIOConfig, deps: dict[Api, ProviderSpec]):
async def get_provider_impl(config: QdrantVectorIOConfig, deps: dict[Api, Any]):
from llama_stack.providers.remote.vector_io.qdrant.qdrant import QdrantVectorIOAdapter
impl = QdrantVectorIOAdapter(config, deps[Api.inference])
assert isinstance(config, QdrantVectorIOConfig), f"Unexpected config type: {type(config)}"
files_api = deps.get(Api.files)
impl = QdrantVectorIOAdapter(config, deps[Api.inference], files_api)
await impl.initialize()
return impl

View file

@ -9,15 +9,23 @@ from typing import Any
from pydantic import BaseModel
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
from llama_stack.schema_utils import json_schema_type
@json_schema_type
class QdrantVectorIOConfig(BaseModel):
path: str
kvstore: KVStoreConfig
@classmethod
def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]:
return {
"path": "${env.QDRANT_PATH:=~/.llama/" + __distro_dir__ + "}/" + "qdrant.db",
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__, db_name="qdrant_registry.db"
),
}

View file

@ -15,6 +15,7 @@ import numpy as np
import sqlite_vec
from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference
from llama_stack.apis.vector_dbs import VectorDB
@ -508,11 +509,11 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
return self.cache[vector_db_id]
if self.vector_db_store is None:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
vector_db = self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
index = VectorDBWithIndex(
vector_db=vector_db,
@ -537,7 +538,7 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
# The VectorDBWithIndex helper is expected to compute embeddings via the inference_api
# and then call our index's add_chunks.
await index.insert_chunks(chunks)
@ -547,14 +548,14 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
"""Delete a chunk from a sqlite_vec index."""
index = await self._get_and_cache_vector_db_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")
raise VectorStoreNotFoundError(store_id)
for chunk_id in chunk_ids:
# Use the index's delete_chunk method

View file

@ -460,6 +460,7 @@ See [Weaviate's documentation](https://weaviate.io/developers/weaviate) for more
module="llama_stack.providers.inline.vector_io.qdrant",
config_class="llama_stack.providers.inline.vector_io.qdrant.QdrantVectorIOConfig",
api_dependencies=[Api.inference],
optional_api_dependencies=[Api.files],
description=r"""
[Qdrant](https://qdrant.tech/documentation/) is an inline and remote vector database provider for Llama Stack. It
allows you to store and query vectors directly in memory.
@ -516,6 +517,7 @@ Please refer to the inline provider documentation.
""",
),
api_dependencies=[Api.inference],
optional_api_dependencies=[Api.files],
),
remote_provider_spec(
Api.vector_io,

View file

@ -20,7 +20,7 @@ This provider enables dataset management using NVIDIA's NeMo Customizer service.
Build the NVIDIA environment:
```bash
llama stack build --template nvidia --image-type conda
llama stack build --distro nvidia --image-type venv
```
### Basic Usage using the LlamaStack Python Client
@ -34,7 +34,7 @@ os.environ["NVIDIA_API_KEY"] = "your-api-key"
os.environ["NVIDIA_CUSTOMIZER_URL"] = "http://nemo.test"
os.environ["NVIDIA_DATASET_NAMESPACE"] = "default"
os.environ["NVIDIA_PROJECT_ID"] = "test-project"
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
from llama_stack.core.library_client import LlamaStackAsLibraryClient
client = LlamaStackAsLibraryClient("nvidia")
client.initialize()

View file

@ -5,7 +5,7 @@
# the root directory of this source tree.
from typing import Any
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
from .config import NVIDIAEvalConfig

View file

@ -39,7 +39,7 @@ from llama_stack.apis.inference import (
ToolDefinition,
ToolPromptFormat,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.model_registry import (
ModelRegistryHelper,

View file

@ -32,7 +32,7 @@ class LlamaCompatInferenceAdapter(OpenAIMixin, LiteLLMOpenAIMixin):
LiteLLMOpenAIMixin.__init__(
self,
model_entries=MODEL_ENTRIES,
litellm_provider_name="llama",
litellm_provider_name="meta_llama",
api_key_from_config=config.api_key,
provider_data_api_key_field="llama_api_key",
openai_compat_api_base=config.openai_compat_api_base,

View file

@ -18,7 +18,7 @@ This provider enables running inference using NVIDIA NIM.
Build the NVIDIA environment:
```bash
llama stack build --template nvidia --image-type conda
llama stack build --distro nvidia --image-type venv
```
### Basic Usage using the LlamaStack Python Client
@ -33,7 +33,7 @@ os.environ["NVIDIA_API_KEY"] = (
)
os.environ["NVIDIA_BASE_URL"] = "http://nim.test" # NIM URL
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
from llama_stack.core.library_client import LlamaStackAsLibraryClient
client = LlamaStackAsLibraryClient("nvidia")
client.initialize()

View file

@ -24,9 +24,19 @@ class OpenAIConfig(BaseModel):
default=None,
description="API key for OpenAI models",
)
base_url: str = Field(
default="https://api.openai.com/v1",
description="Base URL for OpenAI API",
)
@classmethod
def sample_run_config(cls, api_key: str = "${env.OPENAI_API_KEY:=}", **kwargs) -> dict[str, Any]:
def sample_run_config(
cls,
api_key: str = "${env.OPENAI_API_KEY:=}",
base_url: str = "${env.OPENAI_BASE_URL:=https://api.openai.com/v1}",
**kwargs,
) -> dict[str, Any]:
return {
"api_key": api_key,
"base_url": base_url,
}

View file

@ -65,9 +65,9 @@ class OpenAIInferenceAdapter(OpenAIMixin, LiteLLMOpenAIMixin):
"""
Get the OpenAI API base URL.
Returns the standard OpenAI API base URL for direct OpenAI API calls.
Returns the OpenAI API base URL from the configuration.
"""
return "https://api.openai.com/v1"
return self.config.base_url
async def initialize(self) -> None:
await super().initialize()

View file

@ -34,7 +34,7 @@ from llama_stack.apis.inference import (
ToolPromptFormat,
)
from llama_stack.apis.models import Model
from llama_stack.distribution.library_client import convert_pydantic_to_json_value, convert_to_pydantic
from llama_stack.core.library_client import convert_pydantic_to_json_value, convert_to_pydantic
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params

View file

@ -4,178 +4,13 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import json
from collections.abc import Iterable
import requests
from openai.types.chat import (
ChatCompletionAssistantMessageParam as OpenAIChatCompletionAssistantMessage,
)
from openai.types.chat import (
ChatCompletionContentPartImageParam as OpenAIChatCompletionContentPartImageParam,
)
from openai.types.chat import (
ChatCompletionContentPartParam as OpenAIChatCompletionContentPartParam,
)
from openai.types.chat import (
ChatCompletionContentPartTextParam as OpenAIChatCompletionContentPartTextParam,
)
from openai.types.chat import (
ChatCompletionMessageParam as OpenAIChatCompletionMessage,
)
from openai.types.chat import (
ChatCompletionMessageToolCallParam as OpenAIChatCompletionMessageToolCall,
)
from openai.types.chat import (
ChatCompletionSystemMessageParam as OpenAIChatCompletionSystemMessage,
)
from openai.types.chat import (
ChatCompletionToolMessageParam as OpenAIChatCompletionToolMessage,
)
from openai.types.chat import (
ChatCompletionUserMessageParam as OpenAIChatCompletionUserMessage,
)
from openai.types.chat.chat_completion_content_part_image_param import (
ImageURL as OpenAIImageURL,
)
from openai.types.chat.chat_completion_message_tool_call_param import (
Function as OpenAIFunction,
)
from llama_stack.apis.common.content_types import (
ImageContentItem,
InterleavedContent,
TextContentItem,
)
from llama_stack.apis.inference import (
ChatCompletionRequest,
CompletionMessage,
JsonSchemaResponseFormat,
Message,
SystemMessage,
ToolChoice,
ToolResponseMessage,
UserMessage,
)
from llama_stack.apis.models import Model
from llama_stack.log import get_logger
from llama_stack.models.llama.datatypes import BuiltinTool
from llama_stack.providers.utils.inference.litellm_openai_mixin import LiteLLMOpenAIMixin
from llama_stack.providers.utils.inference.openai_compat import (
convert_tooldef_to_openai_tool,
get_sampling_options,
)
from llama_stack.providers.utils.inference.prompt_adapter import convert_image_content_to_url
from .config import SambaNovaImplConfig
from .models import MODEL_ENTRIES
logger = get_logger(name=__name__, category="inference")
async def convert_message_to_openai_dict_with_b64_images(
message: Message | dict,
) -> OpenAIChatCompletionMessage:
"""
Convert a Message to an OpenAI API-compatible dictionary.
"""
# users can supply a dict instead of a Message object, we'll
# convert it to a Message object and proceed with some type safety.
if isinstance(message, dict):
if "role" not in message:
raise ValueError("role is required in message")
if message["role"] == "user":
message = UserMessage(**message)
elif message["role"] == "assistant":
message = CompletionMessage(**message)
elif message["role"] == "tool":
message = ToolResponseMessage(**message)
elif message["role"] == "system":
message = SystemMessage(**message)
else:
raise ValueError(f"Unsupported message role: {message['role']}")
# Map Llama Stack spec to OpenAI spec -
# str -> str
# {"type": "text", "text": ...} -> {"type": "text", "text": ...}
# {"type": "image", "image": {"url": {"uri": ...}}} -> {"type": "image_url", "image_url": {"url": ...}}
# {"type": "image", "image": {"data": ...}} -> {"type": "image_url", "image_url": {"url": "data:image/?;base64,..."}}
# List[...] -> List[...]
async def _convert_message_content(
content: InterleavedContent,
) -> str | Iterable[OpenAIChatCompletionContentPartParam]:
async def impl(
content_: InterleavedContent,
) -> str | OpenAIChatCompletionContentPartParam | list[OpenAIChatCompletionContentPartParam]:
# Llama Stack and OpenAI spec match for str and text input
if isinstance(content_, str):
return content_
elif isinstance(content_, TextContentItem):
return OpenAIChatCompletionContentPartTextParam(
type="text",
text=content_.text,
)
elif isinstance(content_, ImageContentItem):
return OpenAIChatCompletionContentPartImageParam(
type="image_url",
image_url=OpenAIImageURL(url=await convert_image_content_to_url(content_, download=True)),
)
elif isinstance(content_, list):
return [await impl(item) for item in content_]
else:
raise ValueError(f"Unsupported content type: {type(content_)}")
ret = await impl(content)
# OpenAI*Message expects a str or list
if isinstance(ret, str) or isinstance(ret, list):
return ret
else:
return [ret]
out: OpenAIChatCompletionMessage = None
if isinstance(message, UserMessage):
out = OpenAIChatCompletionUserMessage(
role="user",
content=await _convert_message_content(message.content),
)
elif isinstance(message, CompletionMessage):
out = OpenAIChatCompletionAssistantMessage(
role="assistant",
content=await _convert_message_content(message.content),
tool_calls=[
OpenAIChatCompletionMessageToolCall(
id=tool.call_id,
function=OpenAIFunction(
name=tool.tool_name if not isinstance(tool.tool_name, BuiltinTool) else tool.tool_name.value,
arguments=json.dumps(tool.arguments),
),
type="function",
)
for tool in message.tool_calls
]
or None,
)
elif isinstance(message, ToolResponseMessage):
out = OpenAIChatCompletionToolMessage(
role="tool",
tool_call_id=message.call_id,
content=await _convert_message_content(message.content),
)
elif isinstance(message, SystemMessage):
out = OpenAIChatCompletionSystemMessage(
role="system",
content=await _convert_message_content(message.content),
)
else:
raise ValueError(f"Unsupported message type: {type(message)}")
return out
class SambaNovaInferenceAdapter(LiteLLMOpenAIMixin):
_config: SambaNovaImplConfig
def __init__(self, config: SambaNovaImplConfig):
self.config = config
self.environment_available_models = []
@ -185,89 +20,7 @@ class SambaNovaInferenceAdapter(LiteLLMOpenAIMixin):
litellm_provider_name="sambanova",
api_key_from_config=self.config.api_key.get_secret_value() if self.config.api_key else None,
provider_data_api_key_field="sambanova_api_key",
openai_compat_api_base=self.config.url,
download_images=True, # SambaNova requires base64 image encoding
json_schema_strict=False, # SambaNova doesn't support strict=True yet
)
def _get_api_key(self) -> str:
config_api_key = self.config.api_key if self.config.api_key else None
if config_api_key:
return config_api_key.get_secret_value()
else:
provider_data = self.get_request_provider_data()
if provider_data is None or not provider_data.sambanova_api_key:
raise ValueError(
'Pass Sambanova API Key in the header X-LlamaStack-Provider-Data as { "sambanova_api_key": <your api key> }'
)
return provider_data.sambanova_api_key
async def _get_params(self, request: ChatCompletionRequest) -> dict:
input_dict = {}
input_dict["messages"] = [await convert_message_to_openai_dict_with_b64_images(m) for m in request.messages]
if fmt := request.response_format:
if not isinstance(fmt, JsonSchemaResponseFormat):
raise ValueError(
f"Unsupported response format: {type(fmt)}. Only JsonSchemaResponseFormat is supported."
)
fmt = fmt.json_schema
name = fmt["title"]
del fmt["title"]
fmt["additionalProperties"] = False
# Apply additionalProperties: False recursively to all objects
fmt = self._add_additional_properties_recursive(fmt)
input_dict["response_format"] = {
"type": "json_schema",
"json_schema": {
"name": name,
"schema": fmt,
"strict": False,
},
}
if request.tools:
input_dict["tools"] = [convert_tooldef_to_openai_tool(tool) for tool in request.tools]
if request.tool_config.tool_choice:
input_dict["tool_choice"] = (
request.tool_config.tool_choice.value
if isinstance(request.tool_config.tool_choice, ToolChoice)
else request.tool_config.tool_choice
)
provider_data = self.get_request_provider_data()
key_field = self.provider_data_api_key_field
if provider_data and getattr(provider_data, key_field, None):
api_key = getattr(provider_data, key_field)
else:
api_key = self._get_api_key()
return {
"model": request.model,
"api_key": api_key,
"api_base": self.config.url,
**input_dict,
"stream": request.stream,
**get_sampling_options(request.sampling_params),
}
async def register_model(self, model: Model) -> Model:
model_id = self.get_provider_model_id(model.provider_resource_id)
list_models_url = self.config.url + "/models"
if len(self.environment_available_models) == 0:
try:
response = requests.get(list_models_url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Request to {list_models_url} failed") from e
self.environment_available_models = [model.get("id") for model in response.json().get("data", {})]
if model_id.split("sambanova/")[-1] not in self.environment_available_models:
logger.warning(f"Model {model_id} not available in {list_models_url}")
return model
async def initialize(self):
await super().initialize()
async def shutdown(self):
await super().shutdown()

View file

@ -38,7 +38,7 @@ from llama_stack.apis.inference import (
ToolDefinition,
ToolPromptFormat,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
from llama_stack.providers.utils.inference.openai_compat import (

View file

@ -22,7 +22,7 @@ This provider enables fine-tuning of LLMs using NVIDIA's NeMo Customizer service
Build the NVIDIA environment:
```bash
llama stack build --template nvidia --image-type conda
llama stack build --distro nvidia --image-type venv
```
### Basic Usage using the LlamaStack Python Client
@ -40,7 +40,7 @@ os.environ["NVIDIA_DATASET_NAMESPACE"] = "default"
os.environ["NVIDIA_PROJECT_ID"] = "test-project"
os.environ["NVIDIA_OUTPUT_MODEL_DIR"] = "test-example-model@v1"
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
from llama_stack.core.library_client import LlamaStackAsLibraryClient
client = LlamaStackAsLibraryClient("nvidia")
client.initialize()

View file

@ -52,6 +52,9 @@ class BedrockSafetyAdapter(Safety, ShieldsProtocolPrivate):
f"Shield {shield.provider_resource_id} with version {shield.params['guardrailVersion']} not found in Bedrock"
)
async def unregister_shield(self, identifier: str) -> None:
pass
async def run_shield(
self, shield_id: str, messages: list[Message], params: dict[str, Any] = None
) -> RunShieldResponse:

View file

@ -19,7 +19,7 @@ This provider enables safety checks and guardrails for LLM interactions using NV
Build the NVIDIA environment:
```bash
llama stack build --template nvidia --image-type conda
llama stack build --distro nvidia --image-type venv
```
### Basic Usage using the LlamaStack Python Client
@ -32,7 +32,7 @@ import os
os.environ["NVIDIA_API_KEY"] = "your-api-key"
os.environ["NVIDIA_GUARDRAILS_URL"] = "http://guardrails.test"
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
from llama_stack.core.library_client import LlamaStackAsLibraryClient
client = LlamaStackAsLibraryClient("nvidia")
client.initialize()

View file

@ -40,6 +40,9 @@ class NVIDIASafetyAdapter(Safety, ShieldsProtocolPrivate):
if not shield.provider_resource_id:
raise ValueError("Shield model not provided.")
async def unregister_shield(self, identifier: str) -> None:
pass
async def run_shield(
self, shield_id: str, messages: list[Message], params: dict[str, Any] | None = None
) -> RunShieldResponse:

View file

@ -19,7 +19,7 @@ from llama_stack.apis.safety import (
ViolationLevel,
)
from llama_stack.apis.shields import Shield
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
from llama_stack.providers.utils.inference.openai_compat import convert_message_to_openai_dict_new
@ -68,6 +68,9 @@ class SambaNovaSafetyAdapter(Safety, ShieldsProtocolPrivate, NeedsRequestProvide
):
logger.warning(f"Shield {shield.provider_resource_id} not available in {list_models_url}")
async def unregister_shield(self, identifier: str) -> None:
pass
async def run_shield(
self, shield_id: str, messages: list[Message], params: dict[str, Any] | None = None
) -> RunShieldResponse:

View file

@ -18,7 +18,7 @@ from llama_stack.apis.tools import (
ToolParameter,
ToolRuntime,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ToolGroupsProtocolPrivate
from .config import BingSearchToolConfig

View file

@ -17,7 +17,7 @@ from llama_stack.apis.tools import (
ToolParameter,
ToolRuntime,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.models.llama.datatypes import BuiltinTool
from llama_stack.providers.datatypes import ToolGroupsProtocolPrivate

View file

@ -15,7 +15,7 @@ from llama_stack.apis.tools import (
ToolInvocationResult,
ToolRuntime,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import ToolGroupsProtocolPrivate
from llama_stack.providers.utils.tools.mcp import invoke_mcp_tool, list_mcp_tools

View file

@ -18,7 +18,7 @@ from llama_stack.apis.tools import (
ToolParameter,
ToolRuntime,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ToolGroupsProtocolPrivate
from .config import TavilySearchToolConfig

View file

@ -18,7 +18,7 @@ from llama_stack.apis.tools import (
ToolParameter,
ToolRuntime,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import ToolGroupsProtocolPrivate
from .config import WolframAlphaToolConfig

View file

@ -8,7 +8,6 @@ import asyncio
import json
import logging
import os
import re
from typing import Any
from numpy.typing import NDArray
@ -20,6 +19,7 @@ except ImportError:
Function = None
FunctionType = None
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
@ -37,6 +37,7 @@ from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
from .config import MilvusVectorIOConfig as RemoteMilvusVectorIOConfig
@ -50,14 +51,6 @@ OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:milvus:{VERSION}::"
def sanitize_collection_name(name: str) -> str:
"""
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
Any other characters are replaced with underscores.
"""
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
class MilvusIndex(EmbeddingIndex):
def __init__(
self, client: MilvusClient, collection_name: str, consistency_level="Strong", kvstore: KVStore | None = None
@ -366,11 +359,11 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
return self.cache[vector_db_id]
if self.vector_db_store is None:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
index = VectorDBWithIndex(
vector_db=vector_db,
@ -393,7 +386,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
await index.insert_chunks(chunks)
@ -405,7 +398,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
if params and params.get("mode") == "keyword":
# Check if this is inline Milvus (Milvus-Lite)
@ -421,7 +414,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
"""Delete a chunk from a milvus vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")
raise VectorStoreNotFoundError(store_id)
for chunk_id in chunk_ids:
# Use the index's delete_chunk method

View file

@ -13,6 +13,7 @@ from psycopg2 import sql
from psycopg2.extras import Json, execute_values
from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files.files import Files
from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
@ -131,8 +132,11 @@ class PGVectorIndex(EmbeddingIndex):
chunks = []
scores = []
for doc, dist in results:
score = 1.0 / float(dist) if dist != 0 else float("inf")
if score < score_threshold:
continue
chunks.append(Chunk(**doc))
scores.append(1.0 / float(dist) if dist != 0 else float("inf"))
scores.append(score)
return QueryChunksResponse(chunks=chunks, scores=scores)
@ -275,7 +279,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
"""Delete a chunk from a PostgreSQL vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")
raise VectorStoreNotFoundError(store_id)
for chunk_id in chunk_ids:
# Use the index's delete_chunk method

View file

@ -12,6 +12,7 @@ from .config import QdrantVectorIOConfig
async def get_adapter_impl(config: QdrantVectorIOConfig, deps: dict[Api, ProviderSpec]):
from .qdrant import QdrantVectorIOAdapter
impl = QdrantVectorIOAdapter(config, deps[Api.inference])
files_api = deps.get(Api.files)
impl = QdrantVectorIOAdapter(config, deps[Api.inference], files_api)
await impl.initialize()
return impl

View file

@ -8,6 +8,10 @@ from typing import Any
from pydantic import BaseModel
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
from llama_stack.schema_utils import json_schema_type
@ -23,9 +27,14 @@ class QdrantVectorIOConfig(BaseModel):
prefix: str | None = None
timeout: int | None = None
host: str | None = None
kvstore: KVStoreConfig
@classmethod
def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]:
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
return {
"api_key": "${env.QDRANT_API_KEY}",
"api_key": "${env.QDRANT_API_KEY:=}",
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="qdrant_registry.db",
),
}

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import logging
import uuid
from typing import Any
@ -12,25 +13,21 @@ from numpy.typing import NDArray
from qdrant_client import AsyncQdrantClient, models
from qdrant_client.models import PointStruct
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
Chunk,
QueryChunksResponse,
SearchRankingOptions,
VectorIO,
VectorStoreChunkingStrategy,
VectorStoreDeleteResponse,
VectorStoreFileContentsResponse,
VectorStoreFileObject,
VectorStoreFileStatus,
VectorStoreListFilesResponse,
VectorStoreListResponse,
VectorStoreObject,
VectorStoreSearchResponsePage,
)
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
@ -41,6 +38,10 @@ from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig
log = logging.getLogger(__name__)
CHUNK_ID_KEY = "_chunk_id"
# KV store prefixes for vector databases
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:qdrant:{VERSION}::"
def convert_id(_id: str) -> str:
"""
@ -58,6 +59,11 @@ class QdrantIndex(EmbeddingIndex):
self.client = client
self.collection_name = collection_name
async def initialize(self) -> None:
# Qdrant collections are created on-demand in add_chunks
# If the collection does not exist, it will be created in add_chunks.
pass
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
@ -83,7 +89,15 @@ class QdrantIndex(EmbeddingIndex):
await self.client.upsert(collection_name=self.collection_name, points=points)
async def delete_chunk(self, chunk_id: str) -> None:
raise NotImplementedError("delete_chunk is not supported in qdrant")
"""Remove a chunk from the Qdrant collection."""
try:
await self.client.delete(
collection_name=self.collection_name,
points_selector=models.PointIdsList(points=[convert_id(chunk_id)]),
)
except Exception as e:
log.error(f"Error deleting chunk {chunk_id} from Qdrant collection {self.collection_name}: {e}")
raise
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
results = (
@ -135,17 +149,41 @@ class QdrantIndex(EmbeddingIndex):
await self.client.delete_collection(collection_name=self.collection_name)
class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
def __init__(
self, config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig, inference_api: Api.inference
self,
config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig,
inference_api: Api.inference,
files_api: Files | None = None,
) -> None:
self.config = config
self.client: AsyncQdrantClient = None
self.cache = {}
self.inference_api = inference_api
self.files_api = files_api
self.vector_db_store = None
self.kvstore: KVStore | None = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self._qdrant_lock = asyncio.Lock()
async def initialize(self) -> None:
self.client = AsyncQdrantClient(**self.config.model_dump(exclude_none=True))
client_config = self.config.model_dump(exclude_none=True, exclude={"kvstore"})
self.client = AsyncQdrantClient(**client_config)
self.kvstore = await kvstore_impl(self.config.kvstore)
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs:
vector_db = VectorDB.model_validate_json(vector_db_data)
index = VectorDBWithIndex(
vector_db,
QdrantIndex(self.client, vector_db.identifier),
self.inference_api,
)
self.cache[vector_db.identifier] = index
self.openai_vector_stores = await self._load_openai_vector_stores()
async def shutdown(self) -> None:
await self.client.close()
@ -154,6 +192,10 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
self,
vector_db: VectorDB,
) -> None:
assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json())
index = VectorDBWithIndex(
vector_db=vector_db,
index=QdrantIndex(self.client, vector_db.identifier),
@ -167,13 +209,19 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
assert self.kvstore is not None
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}")
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
if self.vector_db_store is None:
raise ValueError(f"Vector DB not found {vector_db_id}")
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
index = VectorDBWithIndex(
vector_db=vector_db,
@ -191,7 +239,7 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
await index.insert_chunks(chunks)
@ -203,65 +251,10 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def openai_create_vector_store(
self,
name: str,
file_ids: list[str] | None = None,
expires_after: dict[str, Any] | None = None,
chunking_strategy: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
embedding_model: str | None = None,
embedding_dimension: int | None = 384,
provider_id: str | None = None,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_list_vector_stores(
self,
limit: int | None = 20,
order: str | None = "desc",
after: str | None = None,
before: str | None = None,
) -> VectorStoreListResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_retrieve_vector_store(
self,
vector_store_id: str,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_update_vector_store(
self,
vector_store_id: str,
name: str | None = None,
expires_after: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_delete_vector_store(
self,
vector_store_id: str,
) -> VectorStoreDeleteResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_search_vector_store(
self,
vector_store_id: str,
query: str | list[str],
filters: dict[str, Any] | None = None,
max_num_results: int | None = 10,
ranking_options: SearchRankingOptions | None = None,
rewrite_query: bool | None = False,
search_mode: str | None = "vector",
) -> VectorStoreSearchResponsePage:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_attach_file_to_vector_store(
self,
vector_store_id: str,
@ -269,47 +262,14 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
attributes: dict[str, Any] | None = None,
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_list_files_in_vector_store(
self,
vector_store_id: str,
limit: int | None = 20,
order: str | None = "desc",
after: str | None = None,
before: str | None = None,
filter: VectorStoreFileStatus | None = None,
) -> VectorStoreListFilesResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_retrieve_vector_store_file(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_update_vector_store_file(
self,
vector_store_id: str,
file_id: str,
attributes: dict[str, Any] | None = None,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_delete_vector_store_file(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
# Qdrant doesn't allow multiple clients to access the same storage path simultaneously.
async with self._qdrant_lock:
await super().openai_attach_file_to_vector_store(vector_store_id, file_id, attributes, chunking_strategy)
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
"""Delete chunks from a Qdrant vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")
for chunk_id in chunk_ids:
await index.index.delete_chunk(chunk_id)

View file

@ -12,6 +12,6 @@ from .config import WeaviateVectorIOConfig
async def get_adapter_impl(config: WeaviateVectorIOConfig, deps: dict[Api, ProviderSpec]):
from .weaviate import WeaviateVectorIOAdapter
impl = WeaviateVectorIOAdapter(config, deps[Api.inference])
impl = WeaviateVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files, None))
await impl.initialize()
return impl

View file

@ -12,18 +12,24 @@ from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
from llama_stack.schema_utils import json_schema_type
class WeaviateRequestProviderData(BaseModel):
weaviate_api_key: str
weaviate_cluster_url: str
@json_schema_type
class WeaviateVectorIOConfig(BaseModel):
weaviate_api_key: str | None = Field(description="The API key for the Weaviate instance", default=None)
weaviate_cluster_url: str | None = Field(description="The URL of the Weaviate cluster", default="localhost:8080")
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
class WeaviateVectorIOConfig(BaseModel):
@classmethod
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
def sample_run_config(
cls,
__distro_dir__: str,
**kwargs: Any,
) -> dict[str, Any]:
return {
"weaviate_api_key": None,
"weaviate_cluster_url": "${env.WEAVIATE_CLUSTER_URL:=localhost:8080}",
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="weaviate_registry.db",

View file

@ -14,19 +14,24 @@ from weaviate.classes.init import Auth
from weaviate.classes.query import Filter
from llama_stack.apis.common.content_types import InterleavedContent
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files.files import Files
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import (
OpenAIVectorStoreMixin,
)
from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
from .config import WeaviateRequestProviderData, WeaviateVectorIOConfig
from .config import WeaviateVectorIOConfig
log = logging.getLogger(__name__)
@ -39,11 +44,19 @@ OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_conten
class WeaviateIndex(EmbeddingIndex):
def __init__(self, client: weaviate.Client, collection_name: str, kvstore: KVStore | None = None):
def __init__(
self,
client: weaviate.Client,
collection_name: str,
kvstore: KVStore | None = None,
):
self.client = client
self.collection_name = collection_name
self.collection_name = sanitize_collection_name(collection_name, weaviate_format=True)
self.kvstore = kvstore
async def initialize(self):
pass
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
@ -67,10 +80,13 @@ class WeaviateIndex(EmbeddingIndex):
collection.data.insert_many(data_objects)
async def delete_chunk(self, chunk_id: str) -> None:
raise NotImplementedError("delete_chunk is not supported in Chroma")
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
collection = self.client.collections.get(sanitized_collection_name)
collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id]))
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
collection = self.client.collections.get(self.collection_name)
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
collection = self.client.collections.get(sanitized_collection_name)
results = collection.query.near_vector(
near_vector=embedding.tolist(),
@ -89,13 +105,26 @@ class WeaviateIndex(EmbeddingIndex):
log.exception(f"Failed to parse document: {chunk_json}")
continue
score = 1.0 / doc.metadata.distance if doc.metadata.distance != 0 else float("inf")
if score < score_threshold:
continue
chunks.append(chunk)
scores.append(1.0 / doc.metadata.distance if doc.metadata.distance != 0 else float("inf"))
scores.append(score)
return QueryChunksResponse(chunks=chunks, scores=scores)
async def delete(self, chunk_ids: list[str]) -> None:
collection = self.client.collections.get(self.collection_name)
async def delete(self, chunk_ids: list[str] | None = None) -> None:
"""
Delete chunks by IDs if provided, otherwise drop the entire collection.
"""
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
if chunk_ids is None:
# Drop entire collection if it exists
if self.client.collections.exists(sanitized_collection_name):
self.client.collections.delete(sanitized_collection_name)
return
collection = self.client.collections.get(sanitized_collection_name)
collection.data.delete_many(where=Filter.by_property("id").contains_any(chunk_ids))
async def query_keyword(
@ -119,6 +148,7 @@ class WeaviateIndex(EmbeddingIndex):
class WeaviateVectorIOAdapter(
OpenAIVectorStoreMixin,
VectorIO,
NeedsRequestProviderData,
VectorDBsProtocolPrivate,
@ -140,42 +170,56 @@ class WeaviateVectorIOAdapter(
self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.Client:
provider_data = self.get_request_provider_data()
assert provider_data is not None, "Request provider data must be set"
assert isinstance(provider_data, WeaviateRequestProviderData)
key = f"{provider_data.weaviate_cluster_url}::{provider_data.weaviate_api_key}"
if key in self.client_cache:
return self.client_cache[key]
client = weaviate.connect_to_weaviate_cloud(
cluster_url=provider_data.weaviate_cluster_url,
auth_credentials=Auth.api_key(provider_data.weaviate_api_key),
)
if "localhost" in self.config.weaviate_cluster_url:
log.info("using Weaviate locally in container")
host, port = self.config.weaviate_cluster_url.split(":")
key = "local_test"
client = weaviate.connect_to_local(
host=host,
port=port,
)
else:
log.info("Using Weaviate remote cluster with URL")
key = f"{self.config.weaviate_cluster_url}::{self.config.weaviate_api_key}"
if key in self.client_cache:
return self.client_cache[key]
client = weaviate.connect_to_weaviate_cloud(
cluster_url=self.config.weaviate_cluster_url,
auth_credentials=Auth.api_key(self.config.weaviate_api_key),
)
self.client_cache[key] = client
return client
async def initialize(self) -> None:
"""Set up KV store and load existing vector DBs and OpenAI vector stores."""
# Initialize KV store for metadata
self.kvstore = await kvstore_impl(self.config.kvstore)
# Initialize KV store for metadata if configured
if self.config.kvstore is not None:
self.kvstore = await kvstore_impl(self.config.kvstore)
else:
self.kvstore = None
log.info("No kvstore configured, registry will not persist across restarts")
# Load existing vector DB definitions
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=idx,
inference_api=self.inference_api,
)
if self.kvstore is not None:
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(
client=client,
collection_name=vector_db.identifier,
kvstore=self.kvstore,
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=idx,
inference_api=self.inference_api,
)
# Load OpenAI vector stores metadata into cache
await self.initialize_openai_vector_stores()
# Load OpenAI vector stores metadata into cache
await self.initialize_openai_vector_stores()
async def shutdown(self) -> None:
for client in self.client_cache.values():
@ -186,11 +230,11 @@ class WeaviateVectorIOAdapter(
vector_db: VectorDB,
) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True)
# Create collection if it doesn't exist
if not client.collections.exists(vector_db.identifier):
if not client.collections.exists(sanitized_collection_name):
client.collections.create(
name=vector_db.identifier,
name=sanitized_collection_name,
vectorizer_config=wvc.config.Configure.Vectorizer.none(),
properties=[
wvc.config.Property(
@ -200,30 +244,41 @@ class WeaviateVectorIOAdapter(
],
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
self.cache[sanitized_collection_name] = VectorDBWithIndex(
vector_db,
WeaviateIndex(client=client, collection_name=vector_db.identifier),
WeaviateIndex(client=client, collection_name=sanitized_collection_name),
self.inference_api,
)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def unregister_vector_db(self, vector_db_id: str) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
if sanitized_collection_name not in self.cache or client.collections.exists(sanitized_collection_name) is False:
log.warning(f"Vector DB {sanitized_collection_name} not found")
return
client.collections.delete(sanitized_collection_name)
await self.cache[sanitized_collection_name].index.delete()
del self.cache[sanitized_collection_name]
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
if sanitized_collection_name in self.cache:
return self.cache[sanitized_collection_name]
vector_db = await self.vector_db_store.get_vector_db(sanitized_collection_name)
if not vector_db:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
client = self._get_client()
if not client.collections.exists(vector_db.identifier):
raise ValueError(f"Collection with name `{vector_db.identifier}` not found")
raise ValueError(f"Collection with name `{sanitized_collection_name}` not found")
index = VectorDBWithIndex(
vector_db=vector_db,
index=WeaviateIndex(client=client, collection_name=vector_db.identifier),
index=WeaviateIndex(client=client, collection_name=sanitized_collection_name),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[sanitized_collection_name] = index
return index
async def insert_chunks(
@ -232,9 +287,10 @@ class WeaviateVectorIOAdapter(
chunks: list[Chunk],
ttl_seconds: int | None = None,
) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
await index.insert_chunks(chunks)
@ -244,29 +300,17 @@ class WeaviateVectorIOAdapter(
query: InterleavedContent,
params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
# OpenAI Vector Stores File operations are not supported in Weaviate
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise ValueError(f"Vector DB {sanitized_collection_name} not found")
await index.delete(chunk_ids)

View file

@ -12,7 +12,7 @@ from llama_stack.apis.common.type_system import (
CompletionInputType,
StringType,
)
from llama_stack.distribution.datatypes import Api
from llama_stack.core.datatypes import Api
class ColumnName(Enum):

View file

@ -10,8 +10,8 @@ from llama_stack.apis.inference import (
OpenAIMessageParam,
Order,
)
from llama_stack.distribution.datatypes import AccessRule
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore

View file

@ -38,7 +38,7 @@ from llama_stack.apis.inference import (
ToolDefinition,
ToolPromptFormat,
)
from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
from llama_stack.providers.utils.inference.openai_compat import (
@ -72,13 +72,28 @@ class LiteLLMOpenAIMixin(
api_key_from_config: str | None,
provider_data_api_key_field: str,
openai_compat_api_base: str | None = None,
download_images: bool = False,
json_schema_strict: bool = True,
):
"""
Initialize the LiteLLMOpenAIMixin.
:param model_entries: The model entries to register.
:param api_key_from_config: The API key to use from the config.
:param provider_data_api_key_field: The field in the provider data that contains the API key.
:param litellm_provider_name: The name of the provider, used for model lookups.
:param openai_compat_api_base: The base URL for OpenAI compatibility, or None if not using OpenAI compatibility.
:param download_images: Whether to download images and convert to base64 for message conversion.
:param json_schema_strict: Whether to use strict mode for JSON schema validation.
"""
ModelRegistryHelper.__init__(self, model_entries)
self.litellm_provider_name = litellm_provider_name
self.api_key_from_config = api_key_from_config
self.provider_data_api_key_field = provider_data_api_key_field
self.api_base = openai_compat_api_base
self.download_images = download_images
self.json_schema_strict = json_schema_strict
if openai_compat_api_base:
self.is_openai_compat = True
@ -143,9 +158,8 @@ class LiteLLMOpenAIMixin(
params["model"] = self.get_litellm_model_name(params["model"])
logger.debug(f"params to litellm (openai compat): {params}")
# unfortunately, we need to use synchronous litellm.completion here because litellm
# caches various httpx.client objects in a non-eventloop aware manner
response = litellm.completion(**params)
# see https://docs.litellm.ai/docs/completion/stream#async-completion
response = await litellm.acompletion(**params)
if stream:
return self._stream_chat_completion(response)
else:
@ -155,7 +169,7 @@ class LiteLLMOpenAIMixin(
self, response: litellm.ModelResponse
) -> AsyncIterator[ChatCompletionResponseStreamChunk]:
async def _stream_generator():
for chunk in response:
async for chunk in response:
yield chunk
async for chunk in convert_openai_chat_completion_stream(
@ -197,7 +211,9 @@ class LiteLLMOpenAIMixin(
async def _get_params(self, request: ChatCompletionRequest) -> dict:
input_dict = {}
input_dict["messages"] = [await convert_message_to_openai_dict_new(m) for m in request.messages]
input_dict["messages"] = [
await convert_message_to_openai_dict_new(m, download_images=self.download_images) for m in request.messages
]
if fmt := request.response_format:
if not isinstance(fmt, JsonSchemaResponseFormat):
raise ValueError(
@ -217,7 +233,7 @@ class LiteLLMOpenAIMixin(
"json_schema": {
"name": name,
"schema": fmt,
"strict": True,
"strict": self.json_schema_strict,
},
}
if request.tools:
@ -245,6 +261,12 @@ class LiteLLMOpenAIMixin(
api_key = getattr(provider_data, key_field)
else:
api_key = self.api_key_from_config
if not api_key:
raise ValueError(
"API key is not set. Please provide a valid API key in the "
"provider data header, e.g. x-llamastack-provider-data: "
f'{{"{key_field}": "<API_KEY>"}}, or in the provider config.'
)
return api_key
async def embeddings(
@ -428,3 +450,17 @@ class LiteLLMOpenAIMixin(
logprobs: LogProbConfig | None = None,
):
raise NotImplementedError("Batch chat completion is not supported for OpenAI Compat")
async def check_model_availability(self, model: str) -> bool:
"""
Check if a specific model is available via LiteLLM for the current
provider (self.litellm_provider_name).
:param model: The model identifier to check.
:return: True if the model is available dynamically, False otherwise.
"""
if self.litellm_provider_name not in litellm.models_by_provider:
logger.error(f"Provider {self.litellm_provider_name} is not registered in litellm.")
return False
return model in litellm.models_by_provider[self.litellm_provider_name]

View file

@ -564,6 +564,7 @@ class UnparseableToolCall(BaseModel):
async def convert_message_to_openai_dict_new(
message: Message | dict,
download_images: bool = False,
) -> OpenAIChatCompletionMessage:
"""
Convert a Message to an OpenAI API-compatible dictionary.
@ -607,7 +608,9 @@ async def convert_message_to_openai_dict_new(
elif isinstance(content_, ImageContentItem):
return OpenAIChatCompletionContentPartImageParam(
type="image_url",
image_url=OpenAIImageURL(url=await convert_image_content_to_url(content_)),
image_url=OpenAIImageURL(
url=await convert_image_content_to_url(content_, download=download_images)
),
)
elif isinstance(content_, list):
return [await impl(item) for item in content_]

View file

@ -10,7 +10,7 @@ from typing import Annotated, Literal
from pydantic import BaseModel, Field, field_validator
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
class KVStoreType(Enum):

View file

@ -13,6 +13,7 @@ import uuid
from abc import ABC, abstractmethod
from typing import Any
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files, OpenAIFileObject
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
@ -322,7 +323,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreObject:
"""Retrieves a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
return VectorStoreObject(**store_info)
@ -336,7 +337,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreObject:
"""Modifies a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id].copy()
@ -365,7 +366,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreDeleteResponse:
"""Delete a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
# Delete from persistent storage (provider-specific)
await self._delete_openai_vector_store_from_storage(vector_store_id)
@ -403,7 +404,7 @@ class OpenAIVectorStoreMixin(ABC):
raise ValueError(f"search_mode must be one of {valid_modes}, got {search_mode}")
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
if isinstance(query, list):
search_query = " ".join(query)
@ -432,10 +433,6 @@ class OpenAIVectorStoreMixin(ABC):
# Convert response to OpenAI format
data = []
for chunk, score in zip(response.chunks, response.scores, strict=False):
# Apply score based filtering
if score < score_threshold:
continue
# Apply filters if provided
if filters:
# Simple metadata filtering
@ -556,7 +553,7 @@ class OpenAIVectorStoreMixin(ABC):
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileObject:
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
attributes = attributes or {}
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
@ -661,7 +658,7 @@ class OpenAIVectorStoreMixin(ABC):
order = order or "desc"
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
@ -709,7 +706,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreFileObject:
"""Retrieves a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
@ -725,7 +722,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreFileContentsResponse:
"""Retrieves the contents of a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
@ -748,7 +745,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreFileObject:
"""Updates a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
@ -766,7 +763,7 @@ class OpenAIVectorStoreMixin(ABC):
) -> VectorStoreFileDeleteResponse:
"""Deletes a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
raise VectorStoreNotFoundError(vector_store_id)
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
chunks = [Chunk.model_validate(c) for c in dict_chunks]

View file

@ -30,7 +30,7 @@ from llama_stack.providers.datatypes import Api
from llama_stack.providers.utils.inference.prompt_adapter import (
interleaved_content_as_str,
)
from llama_stack.providers.utils.vector_io.chunk_utils import generate_chunk_id
from llama_stack.providers.utils.vector_io.vector_utils import generate_chunk_id
log = logging.getLogger(__name__)

View file

@ -14,8 +14,8 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObject,
OpenAIResponseObjectWithInput,
)
from llama_stack.distribution.datatypes import AccessRule
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore

View file

@ -7,11 +7,11 @@
from collections.abc import Mapping
from typing import Any, Literal
from llama_stack.distribution.access_control.access_control import default_policy, is_action_allowed
from llama_stack.distribution.access_control.conditions import ProtectedResource
from llama_stack.distribution.access_control.datatypes import AccessRule, Action, Scope
from llama_stack.distribution.datatypes import User
from llama_stack.distribution.request_headers import get_authenticated_user
from llama_stack.core.access_control.access_control import default_policy, is_action_allowed
from llama_stack.core.access_control.conditions import ProtectedResource
from llama_stack.core.access_control.datatypes import AccessRule, Action, Scope
from llama_stack.core.datatypes import User
from llama_stack.core.request_headers import get_authenticated_user
from llama_stack.log import get_logger
from .api import ColumnDefinition, ColumnType, PaginatedResponse, SqlStore

View file

@ -11,7 +11,7 @@ from typing import Annotated, Literal
from pydantic import BaseModel, Field
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from .api import SqlStore

View file

@ -22,7 +22,7 @@ from llama_stack.apis.tools import (
ToolInvocationResult,
ToolParameter,
)
from llama_stack.distribution.datatypes import AuthenticationRequiredError
from llama_stack.core.datatypes import AuthenticationRequiredError
from llama_stack.log import get_logger
from llama_stack.providers.utils.tools.ttl_dict import TTLDict

View file

@ -5,6 +5,7 @@
# the root directory of this source tree.
import hashlib
import re
import uuid
@ -19,3 +20,20 @@ def generate_chunk_id(document_id: str, chunk_text: str, chunk_window: str | Non
if chunk_window:
hash_input += f":{chunk_window}".encode()
return str(uuid.UUID(hashlib.md5(hash_input, usedforsecurity=False).hexdigest()))
def proper_case(s: str) -> str:
"""Convert a string to proper case (first letter uppercase, rest lowercase)."""
return s[0].upper() + s[1:].lower() if s else s
def sanitize_collection_name(name: str, weaviate_format=False) -> str:
"""
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
Any other characters are replaced with underscores.
"""
if not weaviate_format:
s = re.sub(r"[^a-zA-Z0-9_]", "_", name)
else:
s = proper_case(re.sub(r"[^a-zA-Z0-9]", "", name))
return s