mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 04:04:14 +00:00
Some checks failed
Integration Tests / discover-tests (push) Has been skipped
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 7s
Integration Tests / record-tests (push) Has been skipped
Integration Tests / run-tests (push) Has been skipped
Vector IO Integration Tests / test-matrix (3.12, inline::milvus) (push) Failing after 22s
Python Package Build Test / build (3.13) (push) Failing after 16s
Test Llama Stack Build / generate-matrix (push) Successful in 19s
Vector IO Integration Tests / test-matrix (3.13, inline::milvus) (push) Failing after 21s
Vector IO Integration Tests / test-matrix (3.12, inline::sqlite-vec) (push) Failing after 31s
Test Llama Stack Build / build-custom-container-distribution (push) Failing after 32s
Test External API and Providers / test-external (venv) (push) Failing after 32s
Vector IO Integration Tests / test-matrix (3.13, remote::chromadb) (push) Failing after 36s
Vector IO Integration Tests / test-matrix (3.12, remote::chromadb) (push) Failing after 39s
Update ReadTheDocs / update-readthedocs (push) Failing after 31s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 42s
Test Llama Stack Build / build-single-provider (push) Failing after 37s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Failing after 35s
Test Llama Stack Build / build-ubi9-container-distribution (push) Failing after 37s
Vector IO Integration Tests / test-matrix (3.13, remote::pgvector) (push) Failing after 40s
Vector IO Integration Tests / test-matrix (3.12, remote::pgvector) (push) Failing after 42s
Unit Tests / unit-tests (3.12) (push) Failing after 36s
Vector IO Integration Tests / test-matrix (3.13, inline::sqlite-vec) (push) Failing after 40s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 45s
Test Llama Stack Build / build (push) Failing after 6s
Python Package Build Test / build (3.12) (push) Failing after 1m1s
Unit Tests / unit-tests (3.13) (push) Failing after 1m0s
Vector IO Integration Tests / test-matrix (3.13, inline::faiss) (push) Failing after 1m6s
Vector IO Integration Tests / test-matrix (3.12, inline::faiss) (push) Failing after 1m8s
Pre-commit / pre-commit (push) Successful in 1m50s
What does this PR do? This PR adds support for Direct Preference Optimization (DPO) training via the existing HuggingFace inline provider. It introduces a new DPO training recipe, config schema updates, dataset integration, and end-to-end testing to support preference-based fine-tuning with TRL. Test Plan Added integration test: tests/integration/post_training/test_post_training.py::TestPostTraining::test_preference_optimize Ran tests on both CPU and CUDA environments --------- Co-authored-by: Ubuntu <ubuntu@ip-172-31-43-83.ec2.internal> Co-authored-by: Ashwin Bharambe <ashwin.bharambe@gmail.com>
242 lines
8.2 KiB
Python
242 lines
8.2 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import logging
|
|
import sys
|
|
import time
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from llama_stack.apis.post_training import (
|
|
DataConfig,
|
|
DatasetFormat,
|
|
DPOAlignmentConfig,
|
|
DPOLossType,
|
|
LoraFinetuningConfig,
|
|
TrainingConfig,
|
|
)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
skip_because_resource_intensive = pytest.mark.skip(
|
|
reason="""
|
|
Post training tests are extremely resource intensive. They download large models and partly as a result,
|
|
are very slow to run. We cannot run them on every single PR update. CI should be considered
|
|
a scarce resource and properly utilitized.
|
|
"""
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def capture_output(capsys):
|
|
"""Fixture to capture and display output during test execution."""
|
|
yield
|
|
captured = capsys.readouterr()
|
|
if captured.out:
|
|
print("\nCaptured stdout:", captured.out)
|
|
if captured.err:
|
|
print("\nCaptured stderr:", captured.err)
|
|
|
|
|
|
# Force flush stdout to see prints immediately
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
|
|
# How to run this test:
|
|
#
|
|
# LLAMA_STACK_CONFIG=ci-tests uv run --dev pytest tests/integration/post_training/test_post_training.py
|
|
#
|
|
|
|
|
|
# SFT test
|
|
class TestPostTraining:
|
|
@pytest.mark.integration
|
|
@pytest.mark.parametrize(
|
|
"purpose, source",
|
|
[
|
|
(
|
|
"post-training/messages",
|
|
{
|
|
"type": "uri",
|
|
"uri": "huggingface://datasets/llamastack/simpleqa?split=train",
|
|
},
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.timeout(360) # 6 minutes timeout
|
|
@skip_because_resource_intensive
|
|
def test_supervised_fine_tune(self, llama_stack_client, purpose, source):
|
|
logger.info("Starting supervised fine-tuning test")
|
|
|
|
# register dataset to train
|
|
dataset = llama_stack_client.datasets.register(
|
|
purpose=purpose,
|
|
source=source,
|
|
)
|
|
logger.info(f"Registered dataset with ID: {dataset.identifier}")
|
|
|
|
algorithm_config = LoraFinetuningConfig(
|
|
type="LoRA",
|
|
lora_attn_modules=["q_proj", "v_proj", "output_proj"],
|
|
apply_lora_to_mlp=True,
|
|
apply_lora_to_output=False,
|
|
rank=8,
|
|
alpha=16,
|
|
)
|
|
|
|
data_config = DataConfig(
|
|
dataset_id=dataset.identifier,
|
|
batch_size=1,
|
|
shuffle=False,
|
|
data_format=DatasetFormat.instruct,
|
|
)
|
|
|
|
# setup training config with minimal settings
|
|
training_config = TrainingConfig(
|
|
n_epochs=1,
|
|
data_config=data_config,
|
|
max_steps_per_epoch=1,
|
|
gradient_accumulation_steps=1,
|
|
)
|
|
|
|
job_uuid = f"test-job{uuid.uuid4()}"
|
|
logger.info(f"Starting training job with UUID: {job_uuid}")
|
|
|
|
# train with HF trl SFTTrainer as the default
|
|
_ = llama_stack_client.post_training.supervised_fine_tune(
|
|
job_uuid=job_uuid,
|
|
model="ibm-granite/granite-3.3-2b-instruct",
|
|
algorithm_config=algorithm_config,
|
|
training_config=training_config,
|
|
hyperparam_search_config={},
|
|
logger_config={},
|
|
checkpoint_dir=None,
|
|
)
|
|
|
|
while True:
|
|
status = llama_stack_client.post_training.job.status(job_uuid=job_uuid)
|
|
if not status:
|
|
logger.error("Job not found")
|
|
break
|
|
|
|
logger.info(f"Current status: {status}")
|
|
assert status.status in ["scheduled", "in_progress", "completed"]
|
|
if status.status == "completed":
|
|
break
|
|
|
|
logger.info("Waiting for job to complete...")
|
|
time.sleep(10) # Increased sleep time to reduce polling frequency
|
|
|
|
artifacts = llama_stack_client.post_training.job.artifacts(job_uuid=job_uuid)
|
|
logger.info(f"Job artifacts: {artifacts}")
|
|
|
|
logger.info(f"Registered dataset with ID: {dataset.identifier}")
|
|
|
|
# TODO: Fix these tests to properly represent the Jobs API in training
|
|
#
|
|
# async def test_get_training_jobs(self, post_training_stack):
|
|
# post_training_impl = post_training_stack
|
|
# jobs_list = await post_training_impl.get_training_jobs()
|
|
# assert isinstance(jobs_list, list)
|
|
# assert jobs_list[0].job_uuid == "1234"
|
|
|
|
#
|
|
# async def test_get_training_job_status(self, post_training_stack):
|
|
# post_training_impl = post_training_stack
|
|
# job_status = await post_training_impl.get_training_job_status("1234")
|
|
# assert isinstance(job_status, PostTrainingJobStatusResponse)
|
|
# assert job_status.job_uuid == "1234"
|
|
# assert job_status.status == JobStatus.completed
|
|
# assert isinstance(job_status.checkpoints[0], Checkpoint)
|
|
|
|
#
|
|
# async def test_get_training_job_artifacts(self, post_training_stack):
|
|
# post_training_impl = post_training_stack
|
|
# job_artifacts = await post_training_impl.get_training_job_artifacts("1234")
|
|
# assert isinstance(job_artifacts, PostTrainingJobArtifactsResponse)
|
|
# assert job_artifacts.job_uuid == "1234"
|
|
# assert isinstance(job_artifacts.checkpoints[0], Checkpoint)
|
|
# assert job_artifacts.checkpoints[0].identifier == "instructlab/granite-7b-lab"
|
|
# assert job_artifacts.checkpoints[0].epoch == 0
|
|
# assert "/.llama/checkpoints/Llama3.2-3B-Instruct-sft-0" in job_artifacts.checkpoints[0].path
|
|
|
|
# DPO test
|
|
@pytest.mark.integration
|
|
@pytest.mark.parametrize(
|
|
"purpose, source",
|
|
[
|
|
(
|
|
"post-training/messages",
|
|
{
|
|
"type": "uri",
|
|
"uri": "huggingface://datasets/trl-internal-testing/hh-rlhf-helpful-base-trl-style?split=train[:20]",
|
|
},
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.timeout(360)
|
|
def test_preference_optimize(self, llama_stack_client, purpose, source):
|
|
logger.info("Starting DPO preference optimization test")
|
|
|
|
# register preference dataset to train
|
|
dataset = llama_stack_client.datasets.register(
|
|
purpose=purpose,
|
|
source=source,
|
|
)
|
|
logger.info(f"Registered preference dataset with ID: {dataset.identifier}")
|
|
|
|
# DPO algorithm configuration
|
|
algorithm_config = DPOAlignmentConfig(
|
|
beta=0.1,
|
|
loss_type=DPOLossType.sigmoid,
|
|
)
|
|
|
|
data_config = DataConfig(
|
|
dataset_id=dataset.identifier,
|
|
batch_size=1,
|
|
shuffle=False,
|
|
data_format=DatasetFormat.dialog, # DPO datasets often use dialog format
|
|
)
|
|
|
|
# setup training config with minimal settings for DPO
|
|
training_config = TrainingConfig(
|
|
n_epochs=1,
|
|
data_config=data_config,
|
|
max_steps_per_epoch=1, # Just 2 steps for quick testing
|
|
gradient_accumulation_steps=1,
|
|
)
|
|
|
|
job_uuid = f"test-dpo-job-{uuid.uuid4()}"
|
|
logger.info(f"Starting DPO training job with UUID: {job_uuid}")
|
|
|
|
# train with HuggingFace DPO implementation
|
|
_ = llama_stack_client.post_training.preference_optimize(
|
|
job_uuid=job_uuid,
|
|
finetuned_model="distilgpt2", # Much smaller model for faster CI testing
|
|
algorithm_config=algorithm_config,
|
|
training_config=training_config,
|
|
hyperparam_search_config={},
|
|
logger_config={},
|
|
)
|
|
|
|
while True:
|
|
status = llama_stack_client.post_training.job.status(job_uuid=job_uuid)
|
|
if not status:
|
|
logger.error("DPO job not found")
|
|
break
|
|
|
|
logger.info(f"Current DPO status: {status}")
|
|
if status.status == "completed":
|
|
break
|
|
|
|
logger.info("Waiting for DPO job to complete...")
|
|
time.sleep(10) # Increased sleep time to reduce polling frequency
|
|
|
|
artifacts = llama_stack_client.post_training.job.artifacts(job_uuid=job_uuid)
|
|
logger.info(f"DPO job artifacts: {artifacts}")
|