This notebook contains Llama Stack implementation of a common end-to-end workflow for customizing and evaluating LLMs using NVIDIA.

## Prerequisites
- Please reference <TODO: Add docs link> to setup the NVIDIA platform. 

## Setup


In [1]:
import os

# NVIDIA URLs
NDS_URL = "https://datastore.int.aire.nvidia.com"
NEMO_URL = "https://nmp.int.aire.nvidia.com"
NIM_URL = "https://nim.int.aire.nvidia.com"

# Inference env vars
os.environ["NVIDIA_BASE_URL"] = NIM_URL

USER_ID = "llama-stack-user"
NAMESPACE = "default"
PROJECT_ID = "test-project"
CUSTOMIZED_MODEL_DIR = "jg-test-llama-stack@v1"

# Customizer env vars
os.environ["NVIDIA_CUSTOMIZER_URL"] = NEMO_URL
os.environ["NVIDIA_USER_ID"] = USER_ID
os.environ["NVIDIA_DATASET_NAMESPACE"] = NAMESPACE
os.environ["NVIDIA_PROJECT_ID"] = PROJECT_ID
os.environ["NVIDIA_OUTPUT_MODEL_DIR"] = CUSTOMIZED_MODEL_DIR

# Guardrails env vars
os.environ["GUARDRAILS_SERVICE_URL"] = NEMO_URL

# Evaluator env vars
os.environ["NVIDIA_EVALUATOR_URL"] = NEMO_URL


In [2]:
import asyncio
import json
import os
import pprint
from time import sleep, time
from typing import Dict

# import aiohttp
# import requests
# from huggingface_hub import HfApi

# os.environ["HF_ENDPOINT"] = f"{NDS_URL}/v1/hf"
# os.environ["HF_TOKEN"] = "token"

# hf_api = HfApi(endpoint=os.environ.get("HF_ENDPOINT"), token=os.environ.get("HF_TOKEN"))

## Set Up Llama Stack Client
Begin by importing the necessary components from Llama Stack's client library:

In [None]:
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient

client =  LlamaStackAsLibraryClient("nvidia")
client.initialize()

In [4]:
# Helper functions for waiting on jobs
from llama_stack.apis.common.job_types import JobStatus

def wait_customization_job(job_id: str, polling_interval: int = 10, timeout: int = 6000):
    start_time = time()

    response = client.post_training.job.status(job_uuid=job_id)
    job_status = response.status

    print(f"Job status: {job_status} after {time() - start_time} seconds.")

    while job_status in [JobStatus.scheduled, JobStatus.in_progress]:
        sleep(polling_interval)
        response = client.post_training.job.status(job_uuid=job_id)
        job_status = response.status

        print(f"Job status: {job_status} after {time() - start_time} seconds.")

        if time() - start_time > timeout:
            raise RuntimeError(f"Customization Job {job_id} took more than {timeout} seconds.")
        
    return job_status

def wait_eval_job(benchmark_id: str, job_id: str, polling_interval: int = 10, timeout: int = 6000):
    start_time = time()
    job_status = client.eval.jobs.status(benchmark_id=benchmark_id, job_id=job_id)

    print(f"Job status: {job_status} after {time() - start_time} seconds.")

    while job_status in [JobStatus.scheduled, JobStatus.in_progress]:
        sleep(polling_interval)
        job_status = client.eval.jobs.status(benchmark_id=benchmark_id, job_id=job_id)

        print(f"Job status: {job_status} after {time() - start_time} seconds.")

        if time() - start_time > timeout:
            raise RuntimeError(f"Evaluation Job {job_id} took more than {timeout} seconds.")

    return job_status


## TODO: Upload Dataset Using the HuggingFace Client

In [6]:
sample_squad_test_dataset_name = "jg-llama-stack-sample-squad-data"
namespace = "default"
repo_id = f"{namespace}/{sample_squad_test_dataset_name}"

In [None]:
# Create the repo
# hf_api.create_repo(repo_id, repo_type="dataset")

In [None]:
# Upload the files from the local folder
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_data/training",
#     path_in_repo="training",
#     repo_id=repo_id,
#     repo_type="dataset",
# )
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_data/validation",
#     path_in_repo="validation",
#     repo_id=repo_id,
#     repo_type="dataset",
# )
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_data/testing",
#     path_in_repo="testing",
#     repo_id=repo_id,
#     repo_type="dataset",
# )

In [None]:
# Create the dataset
# response = client.datasets.register(...)

In [None]:
# Check the files URL
# response = client.datasets.retrieve(repo_id)
# dataset = response.model_dump()
# assert dataset["source"]["uri"] == f"hf://datasets/{repo_id}"

## Inference

In [None]:
import json
import pprint

with open("./tmp/sample_squad_data/testing/testing.jsonl", "r") as f:
    examples = [json.loads(line) for line in f]

# Get the user prompt from the last example
sample_prompt = examples[-1]["prompt"]
pprint.pprint(sample_prompt)

In [None]:
# Test inference
response = client.inference.chat_completion(
    messages=[
        {"role": "user", "content": sample_prompt}
    ],
    model_id="meta/llama-3.1-8b-instruct",
    sampling_params={
        "max_tokens": 20,
        "strategy": {
            "type": "top_p",
            "temperature": 0.7,
            "top_p": 0.9
        }
    }
)
print(f"Inference response: {response.completion_message.content}")

## Evaluation
TODO: Implement this section after Evalutor integration is done.

In [7]:
benchmark_id = "jg-llama-stack-3"

In [None]:
# Register a benchmark, which creates an Evaluation Config
simple_eval_config = {
    "benchmark_id": benchmark_id,
    "dataset_id": "",
    "scoring_functions": [],
    "metadata": {
        "type": "custom",
        "params": {
            "parallelism": 8
        },
        "tasks": {
            "qa": {
                "type": "completion",
                "params": {
                    "template": {
                        "prompt": "{{prompt}}",
                        "max_tokens": 200
                    }
                },
                "dataset": {
                    "files_url": f"hf://datasets/{repo_id}/testing/testing.jsonl"
                },
                "metrics": {
                    "bleu": {
                        "type": "bleu",
                        "params": {
                            "references": [
                                "{{ideal_response}}"
                            ]
                        }
                    }
                }
            }
        }
    }
}

response = client.benchmarks.register(
    benchmark_id=benchmark_id,
    dataset_id=repo_id,
    scoring_functions=simple_eval_config["scoring_functions"],
    metadata=simple_eval_config["metadata"]
)
print(f"Created benchmark {benchmark_id}")

In [None]:
for benchmark in client.benchmarks.list():
    print(benchmark)

In [None]:
## Launch a simple evaluation with the benchmark
response = client.eval.run_eval(
    benchmark_id=benchmark_id,
    benchmark_config={
        "eval_candidate": {
            "type": "model",
            "model": "meta/llama-3.1-8b-instruct",
            "sampling_params": {
                "strategy": {
                    "type": "top_p",
                    "temperature": 1.0,
                    "top_p": 0.95,
                },
                "max_tokens": 4096,
                "repeat_penalty": 1.0,
            },
        }
    }
)
job_id = response.model_dump()["job_id"]
print(f"Created evaluation job {job_id}")

In [None]:
# Wait for the job to complete
job = wait_eval_job(benchmark_id=benchmark_id, job_id=job_id, polling_interval=5, timeout=600)

In [None]:
print(f"Job {job_id} status: {job.status}")

In [None]:
job_results = client.eval.jobs.retrieve(benchmark_id=benchmark_id, job_id=job_id)
print(f"Job results: {job_results.model_dump()}")

In [None]:
# Extract bleu score and assert it's within range
initial_bleu_score = job_results.scores[benchmark_id].aggregated_results["tasks"]["qa"]["metrics"]["bleu"]["scores"]["sentence"]["value"]
print(f"Initial bleu score: {initial_bleu_score}")

assert initial_bleu_score >= 2

In [None]:
# Extract accuracy and assert it's within range
initial_accuracy_score = job_results.scores[benchmark_id].aggregated_results["tasks"]["qa"]["metrics"]["bleu"]["scores"]["corpus"]["value"]
print(f"Initial accuracy: {initial_accuracy_score}")

assert initial_accuracy_score >= 0.5

## Customization

In [None]:
# Start the customization job
response = client.post_training.supervised_fine_tune(
    job_uuid="",
    model="meta-llama/Llama-3.1-8B-Instruct",
    training_config={
        "n_epochs": 2,
        "data_config": {
            "batch_size": 16,
            "dataset_id": sample_squad_test_dataset_name,
        },
        "optimizer_config": {
            "lr": 0.0001,
        }
    },
    algorithm_config={
        "type": "LoRA",
        "adapter_dim": 16,
        "adapter_dropout": 0.1,
        "alpha": 16,
        # NOTE: These fields are required by `AlgorithmConfig` model, but not directly used by NVIDIA
        "rank": 8,
        "lora_attn_modules": [],
        "apply_lora_to_mlp": True,
        "apply_lora_to_output": False
    },
    hyperparam_search_config={},
    logger_config={},
    checkpoint_dir="",
)

job_id = response.job_uuid
print(f"Created job with ID: {job_id}")

In [None]:
# Wait for the job to complete
job_status = wait_customization_job(job_id=job_id)

In [None]:
print(f"Job {job_id} status: {job_status}")

In [None]:
# Verify that inference with the new model works
from llama_stack.apis.models.models import ModelType

# TODO: Uncomment after https://github.com/meta-llama/llama-stack/pull/1859 is merged
# client.models.register(
#     model_id=CUSTOMIZED_MODEL_DIR,
#     model_type=ModelType.llm,
#     provider_id="nvidia",
# )

# TODO: This won't work until the code above works - errors with model_id not found.
# response = client.inference.completion(
#     content="Complete the sentence using one word: Roses are red, violets are ",
#     stream=False,
#     model_id=f"default/{CUSTOMIZED_MODEL_DIR}",
#     sampling_params={
#         "max_tokens": 50,
#     },
# )

## TODO: Evaluate Customized Model
Implement this section after Evalutor integration is done, and we can register Customized model in Model Registry.

## TODO: Upload Chat Dataset
Implement this section after Data Store integration is done.
Repeat fine-tuning and evaluation with a chat style dataset, which has a list of `messages` instead of a `prompt` and `completion`.

In [36]:
sample_squad_messages_dataset_name = "jg-llama-stack-sample-squad-messages"
namespace = "default"
repo_id = f"{namespace}/{sample_squad_messages_dataset_name}"

In [None]:
# Create the repo
# hf_api.create_repo(repo_id, repo_type="dataset")

In [None]:
# Upload the files from the local folder
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_messages/training",
#     path_in_repo="training",
#     repo_id=repo_id,
#     repo_type="dataset",
# )
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_messages/validation",
#     path_in_repo="validation",
#     repo_id=repo_id,
#     repo_type="dataset",
# )
# hf_api.upload_folder(
#     folder_path="./tmp/sample_squad_messages/testing",
#     path_in_repo="testing",
#     repo_id=repo_id,
#     repo_type="dataset",
# )

In [None]:
# Create the dataset
# response = client.datasets.register(...)

## Inference with chat/completions

In [None]:
with open("./tmp/sample_squad_messages/testing/testing.jsonl", "r") as f:
    examples = [json.loads(line) for line in f]

# get the user and assistant messages from the last example
sample_messages = examples[-1]["messages"][:-1]
pprint.pprint(sample_messages)

In [None]:
# Test inference
response = client.inference.chat_completion(
    messages=sample_messages,
    model_id="meta/llama-3.1-8b-instruct",
    sampling_params={
        "max_tokens": 20,
        "strategy": {
            "type": "top_p",
            "temperature": 0.7,
            "top_p": 0.9
        }
    }
)
assert response.completion_message.content is not None
print(f"Inference response: {response.completion_message.content}")

## Evaluate with chat dataset
TODO: Implement this section after Evalutor integration is done.

## Customization with chat dataset

In [None]:
customized_model_name = "messages-example-model"
customized_model_version = "v2"
customized_model_dir = f"{customized_model_name}@{customized_model_version}"
os.environ["NVIDIA_OUTPUT_MODEL_DIR"] = customized_model_dir

# TODO: We need to re-initialize the client here to pick up the new env vars
# Should the output model dir instead be a parameter to `supervised_fine_tune`?
client.initialize()

In [None]:
response = client.post_training.supervised_fine_tune(
    job_uuid="",
    model="meta-llama/Llama-3.1-8B-Instruct",
    training_config={
        "n_epochs": 2,
        "data_config": {
            "batch_size": 16,
            "dataset_id": sample_squad_messages_dataset_name,
        },
        "optimizer_config": {
            "lr": 0.0001,
        }
    },
    algorithm_config={
        "type": "LoRA",
        "adapter_dim": 16,
        "adapter_dropout": 0.1,
        "alpha": 16,
        # NOTE: These fields are required by `AlgorithmConfig` model, but not directly used by NVIDIA
        "rank": 8,
        "lora_attn_modules": [],
        "apply_lora_to_mlp": True,
        "apply_lora_to_output": False
    },
    hyperparam_search_config={},
    logger_config={},
    checkpoint_dir="",
)

job_id = response.job_uuid
print(f"Created job with ID: {job_id}")

## TODO: Evaluate Customized Model with chat dataset
Implement this section after Evalutor integration is done.

## Guardrails

In [3]:
shield_id = "self-check"

In [None]:
client.shields.register(shield_id=shield_id, provider_id="nvidia")

In [None]:
# Check inference with guardrails
# TODO: For some reason, `role: "user"` returns a 422 error.
message = {"role": "system", "content": "You are stupid."}
response = client.safety.run_shield(
    messages=[message],
    shield_id=shield_id,
    # TODO: These params aren't used. We should probably update implementation to use these.
    params={
        "max_tokens": 150
    }
)

print(f"Safety response: {response}")
# TODO: We expect Guardrails status to be "blocked", but it's actually "success"
# assert response.user_message == "Sorry I cannot do this."

## TODO: Guardrails Evaluation
TODO: Implement this section after Evalutor integration is done.