mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-17 20:39:47 +00:00
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> chore: Enable keyword search for Milvus inline (#3073) With https://github.com/milvus-io/milvus-lite/pull/294 - Milvus Lite supports keyword search using BM25. While introducing keyword search we had explicitly disabled it for inline milvus. This PR removes the need for the check, and enables `inline::milvus` for tests. <!-- If resolving an issue, uncomment and update the line below --> <!-- Closes #[issue-number] --> Run llama stack with `inline::milvus` enabled: ``` pytest tests/integration/vector_io/test_openai_vector_stores.py::test_openai_vector_store_search_modes --stack-config=http://localhost:8321 --embedding-model=all-MiniLM-L6-v2 -v ``` ``` INFO 2025-08-07 17:06:20,932 tests.integration.conftest:64 tests: Setting DISABLE_CODE_SANDBOX=1 for macOS =========================================================================================== test session starts ============================================================================================ platform darwin -- Python 3.12.11, pytest-7.4.4, pluggy-1.5.0 -- /Users/vnarsing/miniconda3/envs/stack-client/bin/python cachedir: .pytest_cache metadata: {'Python': '3.12.11', 'Platform': 'macOS-14.7.6-arm64-arm-64bit', 'Packages': {'pytest': '7.4.4', 'pluggy': '1.5.0'}, 'Plugins': {'asyncio': '0.23.8', 'cov': '6.0.0', 'timeout': '2.2.0', 'socket': '0.7.0', 'html': '3.1.1', 'langsmith': '0.3.39', 'anyio': '4.8.0', 'metadata': '3.0.0'}} rootdir: /Users/vnarsing/go/src/github/meta-llama/llama-stack configfile: pyproject.toml plugins: asyncio-0.23.8, cov-6.0.0, timeout-2.2.0, socket-0.7.0, html-3.1.1, langsmith-0.3.39, anyio-4.8.0, metadata-3.0.0 asyncio: mode=Mode.AUTO collected 3 items tests/integration/vector_io/test_openai_vector_stores.py::test_openai_vector_store_search_modes[None-None-all-MiniLM-L6-v2-None-384-vector] PASSED [ 33%] tests/integration/vector_io/test_openai_vector_stores.py::test_openai_vector_store_search_modes[None-None-all-MiniLM-L6-v2-None-384-keyword] PASSED [ 66%] tests/integration/vector_io/test_openai_vector_stores.py::test_openai_vector_store_search_modes[None-None-all-MiniLM-L6-v2-None-384-hybrid] PASSED [100%] ============================================================================================ 3 passed in 4.75s ============================================================================================= ``` Signed-off-by: Varsha Prasad Narsing <varshaprasad96@gmail.com> Co-authored-by: Francisco Arceo <arceofrancisco@gmail.com> chore: Fixup main pre commit (#3204) build: Bump version to 0.2.18 chore: Faster npm pre-commit (#3206) Adds npm to pre-commit.yml installation and caches ui Removes node installation during pre-commit. <!-- If resolving an issue, uncomment and update the line below --> <!-- Closes #[issue-number] --> <!-- Describe the tests you ran to verify your changes with result summaries. *Provide clear instructions so the plan can be easily re-executed.* --> Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> chiecking in for tonight, wip moving to agents api Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> remove log Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> updated Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> fix: disable ui-prettier & ui-eslint (#3207) chore(pre-commit): add pre-commit hook to enforce llama_stack logger usage (#3061) This PR adds a step in pre-commit to enforce using `llama_stack` logger. Currently, various parts of the code base uses different loggers. As a custom `llama_stack` logger exist and used in the codebase, it is better to standardize its utilization. Signed-off-by: Mustafa Elbehery <melbeher@redhat.com> Co-authored-by: Matthew Farrellee <matt@cs.wisc.edu> fix: fix ```openai_embeddings``` for asymmetric embedding NIMs (#3205) NVIDIA asymmetric embedding models (e.g., `nvidia/llama-3.2-nv-embedqa-1b-v2`) require an `input_type` parameter not present in the standard OpenAI embeddings API. This PR adds the `input_type="query"` as default and updates the documentation to suggest using the `embedding` API for passage embeddings. <!-- If resolving an issue, uncomment and update the line below --> Resolves #2892 ``` pytest -s -v tests/integration/inference/test_openai_embeddings.py --stack-config="inference=nvidia" --embedding-model="nvidia/llama-3.2-nv-embedqa-1b-v2" --env NVIDIA_API_KEY={nvidia_api_key} --env NVIDIA_BASE_URL="https://integrate.api.nvidia.com" ``` cleaning up Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> updating session manager to cache messages locally Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> fix linter Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> more cleanup Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
269 lines
9.6 KiB
Python
269 lines
9.6 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import os
|
|
import signal
|
|
import sys
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import psutil
|
|
import torch
|
|
from datasets import Dataset
|
|
from transformers import AutoConfig, AutoModelForCausalLM
|
|
|
|
from llama_stack.apis.datasetio import DatasetIO
|
|
from llama_stack.apis.post_training import Checkpoint, TrainingConfig
|
|
from llama_stack.log import get_logger
|
|
|
|
from .config import HuggingFacePostTrainingConfig
|
|
|
|
logger = get_logger(name=__name__, category="post_training")
|
|
|
|
|
|
def setup_environment():
|
|
"""Setup common environment variables for training."""
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
|
os.environ["MKL_THREADING_LAYER"] = "GNU"
|
|
os.environ["MKL_SERVICE_FORCE_INTEL"] = "0"
|
|
os.environ["MKL_NUM_THREADS"] = "1"
|
|
|
|
|
|
def bytes_to_gb(to_convert: int) -> str:
|
|
"""Converts memory stats to GB and formats to 2 decimal places.
|
|
Args:
|
|
to_convert: Memory value in bytes
|
|
Returns:
|
|
str: Memory value in GB formatted to 2 decimal places
|
|
"""
|
|
return f"{(to_convert / (1024**3)):.2f}"
|
|
|
|
|
|
def get_memory_stats(device: torch.device) -> dict[str, Any]:
|
|
"""Get memory statistics for the given device."""
|
|
stats = {
|
|
"system_memory": {
|
|
"total": bytes_to_gb(psutil.virtual_memory().total),
|
|
"available": bytes_to_gb(psutil.virtual_memory().available),
|
|
"used": bytes_to_gb(psutil.virtual_memory().used),
|
|
"percent": psutil.virtual_memory().percent,
|
|
}
|
|
}
|
|
|
|
if device.type == "cuda":
|
|
stats["device_memory"] = {
|
|
"allocated": bytes_to_gb(torch.cuda.memory_allocated(device)),
|
|
"reserved": bytes_to_gb(torch.cuda.memory_reserved(device)),
|
|
"max_allocated": bytes_to_gb(torch.cuda.max_memory_allocated(device)),
|
|
}
|
|
elif device.type == "mps":
|
|
# MPS doesn't provide direct memory stats, but we can track system memory
|
|
stats["device_memory"] = {
|
|
"note": "MPS memory stats not directly available",
|
|
"system_memory_used": bytes_to_gb(psutil.virtual_memory().used),
|
|
}
|
|
elif device.type == "cpu":
|
|
# For CPU, we track process memory usage
|
|
process = psutil.Process()
|
|
stats["device_memory"] = {
|
|
"process_rss": bytes_to_gb(process.memory_info().rss),
|
|
"process_vms": bytes_to_gb(process.memory_info().vms),
|
|
"process_percent": process.memory_percent(),
|
|
}
|
|
|
|
return stats
|
|
|
|
|
|
def setup_torch_device(device_str: str) -> torch.device:
|
|
"""Initialize and validate a PyTorch device.
|
|
This function handles device initialization and validation for different device types:
|
|
- CUDA: Validates CUDA availability and handles device selection
|
|
- MPS: Validates MPS availability for Apple Silicon
|
|
- CPU: Basic validation
|
|
- HPU: Raises error as it's not supported
|
|
Args:
|
|
device_str: String specifying the device ('cuda', 'cpu', 'mps')
|
|
Returns:
|
|
torch.device: The initialized and validated device
|
|
Raises:
|
|
RuntimeError: If device initialization fails or device is not supported
|
|
"""
|
|
try:
|
|
device = torch.device(device_str)
|
|
except RuntimeError as e:
|
|
raise RuntimeError(f"Error getting Torch Device {str(e)}") from e
|
|
|
|
# Validate device capabilities
|
|
if device.type == "cuda":
|
|
if not torch.cuda.is_available():
|
|
raise RuntimeError(
|
|
f"{device.type}: Torch has no CUDA/ROCm support or could not detect a compatible device."
|
|
)
|
|
if device.index is None:
|
|
device = torch.device(device.type, torch.cuda.current_device())
|
|
elif device.type == "mps":
|
|
if not torch.backends.mps.is_available():
|
|
raise RuntimeError(f"{device.type}: Torch has no MPS support or could not detect a compatible device.")
|
|
elif device.type == "hpu":
|
|
raise RuntimeError(f"{device.type}: training does not support Intel Gaudi.")
|
|
|
|
return device
|
|
|
|
|
|
async def load_rows_from_dataset(datasetio_api: DatasetIO, dataset_id: str) -> list[dict[str, Any]]:
|
|
"""Load dataset from llama stack dataset provider"""
|
|
try:
|
|
all_rows = await datasetio_api.iterrows(
|
|
dataset_id=dataset_id,
|
|
limit=-1,
|
|
)
|
|
if not isinstance(all_rows.data, list):
|
|
raise RuntimeError("Expected dataset data to be a list")
|
|
return all_rows.data
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load dataset: {str(e)}") from e
|
|
|
|
|
|
def load_model(
|
|
model: str,
|
|
device: torch.device,
|
|
provider_config: HuggingFacePostTrainingConfig,
|
|
) -> AutoModelForCausalLM:
|
|
"""Load and initialize the model for training.
|
|
Args:
|
|
model: The model identifier to load
|
|
device: The device to load the model onto
|
|
provider_config: Provider-specific configuration
|
|
Returns:
|
|
The loaded and initialized model
|
|
Raises:
|
|
RuntimeError: If model loading fails
|
|
"""
|
|
logger.info("Loading the base model")
|
|
try:
|
|
model_config = AutoConfig.from_pretrained(model, **provider_config.model_specific_config)
|
|
model_obj = AutoModelForCausalLM.from_pretrained(
|
|
model,
|
|
torch_dtype="auto" if device.type != "cpu" else "float32",
|
|
quantization_config=None,
|
|
config=model_config,
|
|
**provider_config.model_specific_config,
|
|
)
|
|
# Always move model to specified device
|
|
model_obj = model_obj.to(device)
|
|
logger.info(f"Model loaded and moved to device: {model_obj.device}")
|
|
return model_obj
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load model: {str(e)}") from e
|
|
|
|
|
|
def split_dataset(ds: Dataset) -> tuple[Dataset, Dataset]:
|
|
"""Split dataset into train and validation sets.
|
|
Args:
|
|
ds: Dataset to split
|
|
Returns:
|
|
tuple: (train_dataset, eval_dataset)
|
|
"""
|
|
logger.info("Splitting dataset into train and validation sets")
|
|
train_val_split = ds.train_test_split(test_size=0.1, seed=42)
|
|
train_dataset = train_val_split["train"]
|
|
eval_dataset = train_val_split["test"]
|
|
logger.info(f"Split dataset into {len(train_dataset)} training and {len(eval_dataset)} validation examples")
|
|
return train_dataset, eval_dataset
|
|
|
|
|
|
def setup_signal_handlers():
|
|
"""Setup signal handlers for graceful shutdown."""
|
|
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, initiating graceful shutdown")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
|
def calculate_training_steps(steps_per_epoch: int, config: TrainingConfig) -> dict[str, int]:
|
|
"""Calculate training steps and logging configuration.
|
|
Args:
|
|
steps_per_epoch: Number of training steps per epoch
|
|
config: Training configuration
|
|
Returns:
|
|
dict: Dictionary with calculated step values
|
|
"""
|
|
total_steps = steps_per_epoch * config.n_epochs
|
|
max_steps = min(config.max_steps_per_epoch, total_steps)
|
|
logging_steps = max(1, steps_per_epoch // 50) # Log 50 times per epoch
|
|
|
|
logger.info("Training configuration:")
|
|
logger.info(f"- Steps per epoch: {steps_per_epoch}")
|
|
logger.info(f"- Total steps: {total_steps}")
|
|
logger.info(f"- Max steps: {max_steps}")
|
|
logger.info(f"- Logging steps: {logging_steps}")
|
|
|
|
return {"total_steps": total_steps, "max_steps": max_steps, "logging_steps": logging_steps}
|
|
|
|
|
|
def get_save_strategy(output_dir_path: Path | None) -> tuple[str, str]:
|
|
"""Get save and evaluation strategy based on output directory.
|
|
Args:
|
|
output_dir_path: Optional path to save the model
|
|
Returns:
|
|
tuple: (save_strategy, eval_strategy)
|
|
"""
|
|
if output_dir_path:
|
|
logger.info(f"Will save checkpoints to {output_dir_path}")
|
|
return "epoch", "epoch"
|
|
return "no", "no"
|
|
|
|
|
|
def create_checkpoints(
|
|
output_dir_path: Path, job_uuid: str, model: str, config: TrainingConfig, final_model_name: str
|
|
) -> list[Checkpoint]:
|
|
"""Create checkpoint objects from training output.
|
|
Args:
|
|
output_dir_path: Path to the training output directory
|
|
job_uuid: Unique identifier for the training job
|
|
model: Model identifier
|
|
config: Training configuration
|
|
final_model_name: Name of the final model directory ("merged_model" for SFT, "dpo_model" for DPO)
|
|
Returns:
|
|
List of Checkpoint objects
|
|
"""
|
|
checkpoints = []
|
|
|
|
# Add checkpoint directories
|
|
checkpoint_dirs = sorted(
|
|
[d for d in output_dir_path.glob("checkpoint-*") if d.is_dir()],
|
|
key=lambda x: int(x.name.split("-")[1]),
|
|
)
|
|
|
|
for epoch_number, checkpoint_dir in enumerate(checkpoint_dirs, start=1):
|
|
created_time = datetime.fromtimestamp(os.path.getctime(checkpoint_dir), tz=UTC)
|
|
checkpoint = Checkpoint(
|
|
identifier=checkpoint_dir.name,
|
|
created_at=created_time,
|
|
epoch=epoch_number,
|
|
post_training_job_id=job_uuid,
|
|
path=str(checkpoint_dir),
|
|
)
|
|
checkpoints.append(checkpoint)
|
|
|
|
# Add final model
|
|
final_model_path = output_dir_path / final_model_name
|
|
if final_model_path.exists():
|
|
training_type = "sft" if final_model_name == "merged_model" else "dpo"
|
|
checkpoint = Checkpoint(
|
|
identifier=f"{model}-{training_type}-{config.n_epochs}",
|
|
created_at=datetime.now(UTC),
|
|
epoch=config.n_epochs,
|
|
post_training_job_id=job_uuid,
|
|
path=str(final_model_path),
|
|
)
|
|
checkpoints.append(checkpoint)
|
|
|
|
return checkpoints
|