From ae973c9595e49ca81df8d3eb68f8974348a70b16 Mon Sep 17 00:00:00 2001 From: raspawar Date: Wed, 26 Mar 2025 15:04:20 +0530 Subject: [PATCH] add datasetio code --- llama_stack/providers/registry/datasetio.py | 11 ++ .../remote/datasetio/nvidia/README.md | 0 .../remote/datasetio/nvidia/__init__.py | 23 ++++ .../remote/datasetio/nvidia/config.py | 71 +++++++++++ .../remote/datasetio/nvidia/datasetio.py | 114 ++++++++++++++++++ pyproject.toml | 1 + 6 files changed, 220 insertions(+) create mode 100644 llama_stack/providers/remote/datasetio/nvidia/README.md create mode 100644 llama_stack/providers/remote/datasetio/nvidia/__init__.py create mode 100644 llama_stack/providers/remote/datasetio/nvidia/config.py create mode 100644 llama_stack/providers/remote/datasetio/nvidia/datasetio.py diff --git a/llama_stack/providers/registry/datasetio.py b/llama_stack/providers/registry/datasetio.py index f83dcbc60..7db136136 100644 --- a/llama_stack/providers/registry/datasetio.py +++ b/llama_stack/providers/registry/datasetio.py @@ -36,4 +36,15 @@ def available_providers() -> List[ProviderSpec]: config_class="llama_stack.providers.remote.datasetio.huggingface.HuggingfaceDatasetIOConfig", ), ), + remote_provider_spec( + api=Api.datasetio, + adapter=AdapterSpec( + adapter_type="nvidia", + pip_packages=[ + "datasets", + ], + module="llama_stack.providers.remote.datasetio.nvidia", + config_class="llama_stack.providers.remote.datasetio.nvidia.NvidiaDatasetIOConfig", + ), + ), ] diff --git a/llama_stack/providers/remote/datasetio/nvidia/README.md b/llama_stack/providers/remote/datasetio/nvidia/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/llama_stack/providers/remote/datasetio/nvidia/__init__.py b/llama_stack/providers/remote/datasetio/nvidia/__init__.py new file mode 100644 index 000000000..418daec8d --- /dev/null +++ b/llama_stack/providers/remote/datasetio/nvidia/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import NvidiaDatasetIOConfig + + +async def get_adapter_impl( + config: NvidiaDatasetIOConfig, + _deps, +): + from .datasetio import NvidiaDatasetIOAdapter + + if not isinstance(config, NvidiaDatasetIOConfig): + raise RuntimeError(f"Unexpected config type: {type(config)}") + + impl = NvidiaDatasetIOAdapter(config) + return impl + + +__all__ = ["get_adapter_impl", "NvidiaDatasetIOAdapter"] diff --git a/llama_stack/providers/remote/datasetio/nvidia/config.py b/llama_stack/providers/remote/datasetio/nvidia/config.py new file mode 100644 index 000000000..46aa68e5f --- /dev/null +++ b/llama_stack/providers/remote/datasetio/nvidia/config.py @@ -0,0 +1,71 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import os +import warnings +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + + +class NvidiaDatasetIOConfig(BaseModel): + """Configuration for NVIDIA DatasetIO implementation.""" + + api_key: Optional[str] = Field( + default_factory=lambda: os.getenv("NVIDIA_API_KEY"), + description="The NVIDIA API key.", + ) + + dataset_namespace: Optional[str] = Field( + default_factory=lambda: os.getenv("NVIDIA_DATASET_NAMESPACE", "default"), + description="The NVIDIA dataset namespace.", + ) + + access_policies: Optional[dict] = Field( + default_factory=lambda: os.getenv("NVIDIA_ACCESS_POLICIES", {}), + description="The NVIDIA access policies.", + ) + + project_id: Optional[str] = Field( + default_factory=lambda: os.getenv("NVIDIA_PROJECT_ID", "test-project"), + description="The NVIDIA project ID.", + ) + + datasets_url: str = Field( + default_factory=lambda: os.getenv("NVIDIA_DATASETS_URL", "http://nemo.test"), + description="Base URL for the NeMo Dataset API", + ) + + # warning for default values + def __post_init__(self): + default_values = [] + if os.getenv("NVIDIA_PROJECT_ID") is None: + default_values.append("project_id='test-project'") + if os.getenv("NVIDIA_DATASET_NAMESPACE") is None: + default_values.append("dataset_namespace='default'") + if os.getenv("NVIDIA_ACCESS_POLICIES") is None: + default_values.append("access_policies='{}'") + if os.getenv("NVIDIA_DATASETS_URL") is None: + default_values.append("datasets_url='http://nemo.test'") + + if default_values: + warnings.warn( + f"Using default values: {', '.join(default_values)}. \ + Please set the environment variables to avoid this default behavior.", + stacklevel=2, + ) + + @classmethod + def sample_run_config(cls, **kwargs) -> Dict[str, Any]: + return { + "api_key": "${env.NVIDIA_API_KEY:}", + "user_id": "${env.NVIDIA_USER_ID:llama-stack-user}", + "dataset_namespace": "${env.NVIDIA_DATASET_NAMESPACE:default}", + "access_policies": "${env.NVIDIA_ACCESS_POLICIES:}", + "project_id": "${env.NVIDIA_PROJECT_ID:test-project}", + "customizer_url": "${env.NVIDIA_CUSTOMIZER_URL:}", + "output_model_dir": "${env.NVIDIA_OUTPUT_MODEL_DIR:test-example-model@v1}", + } diff --git a/llama_stack/providers/remote/datasetio/nvidia/datasetio.py b/llama_stack/providers/remote/datasetio/nvidia/datasetio.py new file mode 100644 index 000000000..9a5c8e46b --- /dev/null +++ b/llama_stack/providers/remote/datasetio/nvidia/datasetio.py @@ -0,0 +1,114 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Any, Dict, Literal, Optional + +import aiohttp + +from llama_stack.apis.common.content_types import URL +from llama_stack.apis.common.type_system import ParamType +from llama_stack.apis.datasets.datasets import Dataset, ListDatasetsResponse +from llama_stack.apis.resource import ResourceType +from llama_stack.schema_utils import webmethod + +from .config import NvidiaDatasetIOConfig + + +class NvidiaDatasetIOAdapter: + """Nvidia NeMo DatasetIO API.""" + + type: Literal[ResourceType.dataset.value] = ResourceType.dataset.value + + def __init__(self, config: NvidiaDatasetIOConfig): + self.config = config + self.headers = {} + if config.api_key: + self.headers["Authorization"] = f"Bearer {config.api_key}" + + async def _make_request( + self, + method: str, + path: str, + headers: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, + **kwargs, + ) -> Dict[str, Any]: + """Helper method to make HTTP requests to the Customizer API.""" + url = f"{self.config.datasets_url}{path}" + request_headers = self.headers.copy() # Create a copy to avoid modifying the original + + if headers: + request_headers.update(headers) + + # Add content-type header for JSON requests + if json and "Content-Type" not in request_headers: + request_headers["Content-Type"] = "application/json" + + async with aiohttp.ClientSession(headers=request_headers) as session: + async with session.request(method, url, params=params, json=json, **kwargs) as response: + if response.status >= 400: + error_data = await response.json() + raise Exception(f"API request failed: {error_data}") + return await response.json() + + @webmethod(route="/datasets", method="POST") + async def register_dataset( + self, + dataset_id: str, + dataset_schema: Dict[str, ParamType], + url: URL, + provider_dataset_id: Optional[str] = None, + provider_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """Register a new dataset. + + Args: + dataset_id: The ID of the dataset. + dataset_schema: The schema of the dataset. + url: The URL of the dataset. + provider_dataset_id: The ID of the provider dataset. + provider_id: The ID of the provider. + metadata: The metadata of the dataset. + + Returns: + None + """ + ... + + @webmethod(route="/datasets/{dataset_id:namespace}", method="GET") + async def get_dataset( + self, + dataset_id: str, + ) -> Optional[Dataset]: + raise NotImplementedError("Not implemented") + + @webmethod(route="/datasets", method="GET") + async def list_datasets( + self, + ) -> ListDatasetsResponse: + raise NotImplementedError("Not implemented") + + @webmethod(route="/datasets/{dataset_id:path}", method="POST") + async def update_dataset( + self, + dataset_id: str, + dataset_schema: Dict[str, ParamType], + url: URL, + provider_dataset_id: Optional[str] = None, + provider_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + raise NotImplementedError("Not implemented") + + @webmethod(route="/datasets/{dataset_id:path}", method="DELETE") + async def unregister_dataset( + self, + dataset_id: str, + namespace: Optional[str] = "default", + ) -> None: + raise NotImplementedError("Not implemented") diff --git a/pyproject.toml b/pyproject.toml index 3424cf384..0f44ca053 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -260,6 +260,7 @@ exclude = [ "^llama_stack/providers/inline/scoring/llm_as_judge/", "^llama_stack/providers/remote/agents/sample/", "^llama_stack/providers/remote/datasetio/huggingface/", + "^llama_stack/providers/remote/datasetio/nvidia/", "^llama_stack/providers/remote/inference/anthropic/", "^llama_stack/providers/remote/inference/bedrock/", "^llama_stack/providers/remote/inference/cerebras/",