feat: Add NVIDIA NeMo datastore (#1852)

# What does this PR do?
Implemetation of NeMO Datastore register, unregister API.

Open Issues: 
- provider_id gets set to `localfs` in client.datasets.register() as it
is specified in routing_tables.py: DatasetsRoutingTable
see: #1860

Currently I have passed `"provider_id":"nvidia"` in metadata and have
parsed that in `DatasetsRoutingTable`
(Not the best approach, but just a quick workaround to make it work for
now.)

## Test Plan
- Unit test cases: `pytest
tests/unit/providers/nvidia/test_datastore.py`
```bash
========================================================== test session starts ===========================================================
platform linux -- Python 3.10.0, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/ubuntu/llama-stack
configfile: pyproject.toml
plugins: anyio-4.9.0, asyncio-0.26.0, nbval-0.11.0, metadata-3.1.1, html-4.1.1, cov-6.1.0
asyncio: mode=strict, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function
collected 2 items                                                                                                                        

tests/unit/providers/nvidia/test_datastore.py ..                                                                                   [100%]

============================================================ warnings summary ============================================================

====================================================== 2 passed, 1 warning in 0.84s ======================================================
```

cc: @dglogo, @mattf, @yanxi0830
This commit is contained in:
Rashmi Pawar 2025-04-28 22:11:59 +05:30 committed by GitHub
parent c149cf2e0f
commit e6bbf8d20b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 514 additions and 9 deletions

View file

@ -438,7 +438,10 @@ class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets):
provider_dataset_id = dataset_id
# infer provider from source
if source.type == DatasetType.rows.value:
if metadata:
if metadata.get("provider_id"):
provider_id = metadata.get("provider_id") # pass through from nvidia datasetio
elif source.type == DatasetType.rows.value:
provider_id = "localfs"
elif source.type == DatasetType.uri.value:
# infer provider from uri

View file

@ -36,4 +36,15 @@ def available_providers() -> List[ProviderSpec]:
config_class="llama_stack.providers.remote.datasetio.huggingface.HuggingfaceDatasetIOConfig",
),
),
remote_provider_spec(
api=Api.datasetio,
adapter=AdapterSpec(
adapter_type="nvidia",
pip_packages=[
"datasets",
],
module="llama_stack.providers.remote.datasetio.nvidia",
config_class="llama_stack.providers.remote.datasetio.nvidia.NvidiaDatasetIOConfig",
),
),
]

View file

@ -0,0 +1,74 @@
# NVIDIA DatasetIO Provider for LlamaStack
This provider enables dataset management using NVIDIA's NeMo Customizer service.
## Features
- Register datasets for fine-tuning LLMs
- Unregister datasets
## Getting Started
### Prerequisites
- LlamaStack with NVIDIA configuration
- Access to Hosted NVIDIA NeMo Microservice
- API key for authentication with the NVIDIA service
### Setup
Build the NVIDIA environment:
```bash
llama stack build --template nvidia --image-type conda
```
### Basic Usage using the LlamaStack Python Client
#### Initialize the client
```python
import os
os.environ["NVIDIA_API_KEY"] = "your-api-key"
os.environ["NVIDIA_CUSTOMIZER_URL"] = "http://nemo.test"
os.environ["NVIDIA_USER_ID"] = "llama-stack-user"
os.environ["NVIDIA_DATASET_NAMESPACE"] = "default"
os.environ["NVIDIA_PROJECT_ID"] = "test-project"
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
client = LlamaStackAsLibraryClient("nvidia")
client.initialize()
```
#### Register a dataset
```python
client.datasets.register(
purpose="post-training/messages",
dataset_id="my-training-dataset",
source={"type": "uri", "uri": "hf://datasets/default/sample-dataset"},
metadata={
"format": "json",
"description": "Dataset for LLM fine-tuning",
"provider": "nvidia",
},
)
```
#### Get a list of all registered datasets
```python
datasets = client.datasets.list()
for dataset in datasets:
print(f"Dataset ID: {dataset.identifier}")
print(f"Description: {dataset.metadata.get('description', '')}")
print(f"Source: {dataset.source.uri}")
print("---")
```
#### Unregister a dataset
```python
client.datasets.unregister(dataset_id="my-training-dataset")
```

View file

@ -0,0 +1,23 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import NvidiaDatasetIOConfig
async def get_adapter_impl(
config: NvidiaDatasetIOConfig,
_deps,
):
from .datasetio import NvidiaDatasetIOAdapter
if not isinstance(config, NvidiaDatasetIOConfig):
raise RuntimeError(f"Unexpected config type: {type(config)}")
impl = NvidiaDatasetIOAdapter(config)
return impl
__all__ = ["get_adapter_impl", "NvidiaDatasetIOAdapter"]

View file

@ -0,0 +1,61 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
import warnings
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class NvidiaDatasetIOConfig(BaseModel):
"""Configuration for NVIDIA DatasetIO implementation."""
api_key: Optional[str] = Field(
default_factory=lambda: os.getenv("NVIDIA_API_KEY"),
description="The NVIDIA API key.",
)
dataset_namespace: Optional[str] = Field(
default_factory=lambda: os.getenv("NVIDIA_DATASET_NAMESPACE", "default"),
description="The NVIDIA dataset namespace.",
)
project_id: Optional[str] = Field(
default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-project"),
description="The NVIDIA project ID.",
)
datasets_url: str = Field(
default_factory=lambda: os.getenv("NVIDIA_DATASETS_URL", "http://nemo.test"),
description="Base URL for the NeMo Dataset API",
)
# warning for default values
def __post_init__(self):
default_values = []
if os.getenv("NVIDIA_PROJECT_ID") is None:
default_values.append("project_id='test-project'")
if os.getenv("NVIDIA_DATASET_NAMESPACE") is None:
default_values.append("dataset_namespace='default'")
if os.getenv("NVIDIA_DATASETS_URL") is None:
default_values.append("datasets_url='http://nemo.test'")
if default_values:
warnings.warn(
f"Using default values: {', '.join(default_values)}. \
Please set the environment variables to avoid this default behavior.",
stacklevel=2,
)
@classmethod
def sample_run_config(cls, **kwargs) -> Dict[str, Any]:
return {
"api_key": "${env.NVIDIA_API_KEY:}",
"dataset_namespace": "${env.NVIDIA_DATASET_NAMESPACE:default}",
"project_id": "${env.NVIDIA_PROJECT_ID:test-project}",
"datasets_url": "${env.NVIDIA_DATASETS_URL:http://nemo.test}",
}

View file

@ -0,0 +1,112 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any, Dict, List, Optional
import aiohttp
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.apis.common.type_system import ParamType
from llama_stack.apis.datasets import Dataset
from .config import NvidiaDatasetIOConfig
class NvidiaDatasetIOAdapter:
"""Nvidia NeMo DatasetIO API."""
def __init__(self, config: NvidiaDatasetIOConfig):
self.config = config
self.headers = {}
async def _make_request(
self,
method: str,
path: str,
headers: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Dict[str, Any]:
"""Helper method to make HTTP requests to the Customizer API."""
url = f"{self.config.datasets_url}{path}"
request_headers = self.headers.copy()
if headers:
request_headers.update(headers)
async with aiohttp.ClientSession(headers=request_headers) as session:
async with session.request(method, url, params=params, json=json, **kwargs) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"API request failed: {error_data}")
return await response.json()
async def register_dataset(
self,
dataset_def: Dataset,
) -> Dataset:
"""Register a new dataset.
Args:
dataset_def [Dataset]: The dataset definition.
dataset_id [str]: The ID of the dataset.
source [DataSource]: The source of the dataset.
metadata [Dict[str, Any]]: The metadata of the dataset.
format [str]: The format of the dataset.
description [str]: The description of the dataset.
Returns:
Dataset
"""
## add warnings for unsupported params
request_body = {
"name": dataset_def.identifier,
"namespace": self.config.dataset_namespace,
"files_url": dataset_def.source.uri,
"project": self.config.project_id,
}
if dataset_def.metadata:
request_body["format"] = dataset_def.metadata.get("format")
request_body["description"] = dataset_def.metadata.get("description")
await self._make_request(
"POST",
"/v1/datasets",
json=request_body,
)
return dataset_def
async def update_dataset(
self,
dataset_id: str,
dataset_schema: Dict[str, ParamType],
url: URL,
provider_dataset_id: Optional[str] = None,
provider_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
raise NotImplementedError("Not implemented")
async def unregister_dataset(
self,
dataset_id: str,
) -> None:
await self._make_request(
"DELETE",
f"/v1/datasets/{self.config.dataset_namespace}/{dataset_id}",
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
async def iterrows(
self,
dataset_id: str,
start_index: Optional[int] = None,
limit: Optional[int] = None,
) -> PaginatedResponse:
raise NotImplementedError("Not implemented")
async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None:
raise NotImplementedError("Not implemented")

View file

@ -394,6 +394,7 @@
"aiosqlite",
"blobfile",
"chardet",
"datasets",
"faiss-cpu",
"fastapi",
"fire",

View file

@ -18,6 +18,7 @@ distribution_spec:
- remote::nvidia
datasetio:
- inline::localfs
- remote::nvidia
scoring:
- inline::basic
tool_runtime:

View file

@ -7,6 +7,7 @@
from pathlib import Path
from llama_stack.distribution.datatypes import ModelInput, Provider, ShieldInput, ToolGroupInput
from llama_stack.providers.remote.datasetio.nvidia import NvidiaDatasetIOConfig
from llama_stack.providers.remote.eval.nvidia import NVIDIAEvalConfig
from llama_stack.providers.remote.inference.nvidia import NVIDIAConfig
from llama_stack.providers.remote.inference.nvidia.models import MODEL_ENTRIES
@ -23,7 +24,7 @@ def get_distribution_template() -> DistributionTemplate:
"telemetry": ["inline::meta-reference"],
"eval": ["remote::nvidia"],
"post_training": ["remote::nvidia"],
"datasetio": ["inline::localfs"],
"datasetio": ["inline::localfs", "remote::nvidia"],
"scoring": ["inline::basic"],
"tool_runtime": ["inline::rag-runtime"],
}
@ -38,6 +39,11 @@ def get_distribution_template() -> DistributionTemplate:
provider_type="remote::nvidia",
config=NVIDIASafetyConfig.sample_run_config(),
)
datasetio_provider = Provider(
provider_id="nvidia",
provider_type="remote::nvidia",
config=NvidiaDatasetIOConfig.sample_run_config(),
)
eval_provider = Provider(
provider_id="nvidia",
provider_type="remote::nvidia",
@ -75,6 +81,7 @@ def get_distribution_template() -> DistributionTemplate:
"run.yaml": RunConfigSettings(
provider_overrides={
"inference": [inference_provider],
"datasetio": [datasetio_provider],
"eval": [eval_provider],
},
default_models=default_models,

View file

@ -74,6 +74,13 @@ providers:
type: sqlite
namespace: null
db_path: ${env.SQLITE_STORE_DIR:~/.llama/distributions/nvidia}/localfs_datasetio.db
- provider_id: nvidia
provider_type: remote::nvidia
config:
api_key: ${env.NVIDIA_API_KEY:}
dataset_namespace: ${env.NVIDIA_DATASET_NAMESPACE:default}
project_id: ${env.NVIDIA_PROJECT_ID:test-project}
datasets_url: ${env.NVIDIA_DATASETS_URL:http://nemo.test}
scoring:
- provider_id: basic
provider_type: inline::basic

View file

@ -62,13 +62,13 @@ providers:
project_id: ${env.NVIDIA_PROJECT_ID:test-project}
customizer_url: ${env.NVIDIA_CUSTOMIZER_URL:http://nemo.test}
datasetio:
- provider_id: localfs
provider_type: inline::localfs
- provider_id: nvidia
provider_type: remote::nvidia
config:
kvstore:
type: sqlite
namespace: null
db_path: ${env.SQLITE_STORE_DIR:~/.llama/distributions/nvidia}/localfs_datasetio.db
api_key: ${env.NVIDIA_API_KEY:}
dataset_namespace: ${env.NVIDIA_DATASET_NAMESPACE:default}
project_id: ${env.NVIDIA_PROJECT_ID:test-project}
datasets_url: ${env.NVIDIA_DATASETS_URL:http://nemo.test}
scoring:
- provider_id: basic
provider_type: inline::basic