mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-03 01:03:59 +00:00
add code for register, unregister
This commit is contained in:
parent
1e77873a02
commit
2baf252f71
3 changed files with 44 additions and 31 deletions
|
@ -438,7 +438,10 @@ class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets):
|
||||||
provider_dataset_id = dataset_id
|
provider_dataset_id = dataset_id
|
||||||
|
|
||||||
# infer provider from source
|
# infer provider from source
|
||||||
if source.type == DatasetType.rows.value:
|
if metadata:
|
||||||
|
if metadata.get("provider"):
|
||||||
|
provider_id = metadata.get("provider") # pass through from nvidia datasetio
|
||||||
|
elif source.type == DatasetType.rows.value:
|
||||||
provider_id = "localfs"
|
provider_id = "localfs"
|
||||||
elif source.type == DatasetType.uri.value:
|
elif source.type == DatasetType.uri.value:
|
||||||
# infer provider from uri
|
# infer provider from uri
|
||||||
|
|
|
@ -10,20 +10,17 @@ import aiohttp
|
||||||
|
|
||||||
from llama_stack.apis.common.content_types import URL
|
from llama_stack.apis.common.content_types import URL
|
||||||
from llama_stack.apis.common.type_system import ParamType
|
from llama_stack.apis.common.type_system import ParamType
|
||||||
from llama_stack.apis.datasetio import IterrowsResponse
|
from llama_stack.apis.common.responses import PaginatedResponse
|
||||||
from llama_stack.schema_utils import webmethod
|
from llama_stack.schema_utils import webmethod
|
||||||
|
from llama_stack.apis.datasets import DatasetPurpose, DataSource, Dataset
|
||||||
from .config import NvidiaDatasetIOConfig
|
from .config import NvidiaDatasetIOConfig
|
||||||
|
|
||||||
|
|
||||||
class NvidiaDatasetIOAdapter:
|
class NvidiaDatasetIOAdapter:
|
||||||
"""Nvidia NeMo DatasetIO API."""
|
"""Nvidia NeMo DatasetIO API."""
|
||||||
|
|
||||||
def __init__(self, config: NvidiaDatasetIOConfig):
|
def __init__(self, config: NvidiaDatasetIOConfig):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.headers = {}
|
self.headers = {}
|
||||||
if config.api_key:
|
|
||||||
self.headers["Authorization"] = f"Bearer {config.api_key}"
|
|
||||||
|
|
||||||
async def _make_request(
|
async def _make_request(
|
||||||
self,
|
self,
|
||||||
|
@ -36,46 +33,49 @@ class NvidiaDatasetIOAdapter:
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Helper method to make HTTP requests to the Customizer API."""
|
"""Helper method to make HTTP requests to the Customizer API."""
|
||||||
url = f"{self.config.datasets_url}{path}"
|
url = f"{self.config.datasets_url}{path}"
|
||||||
request_headers = self.headers.copy() # Create a copy to avoid modifying the original
|
request_headers = self.headers.copy()
|
||||||
|
|
||||||
if headers:
|
if headers:
|
||||||
request_headers.update(headers)
|
request_headers.update(headers)
|
||||||
|
|
||||||
# Add content-type header for JSON requests
|
|
||||||
if json and "Content-Type" not in request_headers:
|
|
||||||
request_headers["Content-Type"] = "application/json"
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession(headers=request_headers) as session:
|
async with aiohttp.ClientSession(headers=request_headers) as session:
|
||||||
async with session.request(method, url, params=params, json=json, **kwargs) as response:
|
async with session.request(method, url, params=params, json=json, **kwargs) as response:
|
||||||
if response.status >= 400:
|
if response.status != 200:
|
||||||
error_data = await response.json()
|
error_data = await response.json()
|
||||||
raise Exception(f"API request failed: {error_data}")
|
raise Exception(f"API request failed: {error_data}")
|
||||||
return await response.json()
|
return await response.json()
|
||||||
|
|
||||||
@webmethod(route="/datasets", method="POST")
|
|
||||||
async def register_dataset(
|
async def register_dataset(
|
||||||
self,
|
self,
|
||||||
dataset_id: str,
|
dataset_def: Dataset,
|
||||||
dataset_schema: Dict[str, ParamType],
|
) -> Dataset:
|
||||||
url: URL,
|
|
||||||
provider_dataset_id: Optional[str] = None,
|
|
||||||
provider_id: Optional[str] = None,
|
|
||||||
metadata: Optional[Dict[str, Any]] = None,
|
|
||||||
) -> None:
|
|
||||||
"""Register a new dataset.
|
"""Register a new dataset.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
dataset_def : The dataset definition.
|
||||||
dataset_id: The ID of the dataset.
|
dataset_id: The ID of the dataset.
|
||||||
dataset_schema: The schema of the dataset.
|
source: The source of the dataset.
|
||||||
url: The URL of the dataset.
|
|
||||||
provider_dataset_id: The ID of the provider dataset.
|
|
||||||
provider_id: The ID of the provider.
|
|
||||||
metadata: The metadata of the dataset.
|
metadata: The metadata of the dataset.
|
||||||
|
format: The format of the dataset.
|
||||||
|
description: The description of the dataset.
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
...
|
## add warnings for unsupported params
|
||||||
|
request_body = {
|
||||||
|
"name": dataset_def.identifier,
|
||||||
|
"namespace": self.config.dataset_namespace,
|
||||||
|
"files_url": dataset_def.source.uri,
|
||||||
|
"project": self.config.project_id,
|
||||||
|
}
|
||||||
|
if dataset_def.metadata:
|
||||||
|
request_body["format"] = dataset_def.metadata.get("format")
|
||||||
|
request_body["description"] = dataset_def.metadata.get("description")
|
||||||
|
await self._make_request(
|
||||||
|
"POST",
|
||||||
|
"/v1/datasets",
|
||||||
|
json=request_body,
|
||||||
|
)
|
||||||
|
|
||||||
@webmethod(route="/datasets/{dataset_id:path}", method="POST")
|
@webmethod(route="/datasets/{dataset_id:path}", method="POST")
|
||||||
async def update_dataset(
|
async def update_dataset(
|
||||||
|
@ -93,16 +93,19 @@ class NvidiaDatasetIOAdapter:
|
||||||
async def unregister_dataset(
|
async def unregister_dataset(
|
||||||
self,
|
self,
|
||||||
dataset_id: str,
|
dataset_id: str,
|
||||||
namespace: Optional[str] = "default",
|
|
||||||
) -> None:
|
) -> None:
|
||||||
raise NotImplementedError("Not implemented")
|
await self._make_request(
|
||||||
|
"DELETE",
|
||||||
|
f"/v1/datasets/{self.config.dataset_namespace}/{dataset_id}",
|
||||||
|
headers={"Accept": "application/json", "Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
|
||||||
async def iterrows(
|
async def iterrows(
|
||||||
self,
|
self,
|
||||||
dataset_id: str,
|
dataset_id: str,
|
||||||
start_index: Optional[int] = None,
|
start_index: Optional[int] = None,
|
||||||
limit: Optional[int] = None,
|
limit: Optional[int] = None,
|
||||||
) -> IterrowsResponse:
|
) -> PaginatedResponse:
|
||||||
raise NotImplementedError("Not implemented")
|
raise NotImplementedError("Not implemented")
|
||||||
|
|
||||||
async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None:
|
async def append_rows(self, dataset_id: str, rows: List[Dict[str, Any]]) -> None:
|
||||||
|
|
|
@ -62,6 +62,13 @@ providers:
|
||||||
project_id: ${env.NVIDIA_PROJECT_ID:test-project}
|
project_id: ${env.NVIDIA_PROJECT_ID:test-project}
|
||||||
customizer_url: ${env.NVIDIA_CUSTOMIZER_URL:http://nemo.test}
|
customizer_url: ${env.NVIDIA_CUSTOMIZER_URL:http://nemo.test}
|
||||||
datasetio:
|
datasetio:
|
||||||
|
- provider_id: localfs
|
||||||
|
provider_type: inline::localfs
|
||||||
|
config:
|
||||||
|
kvstore:
|
||||||
|
type: sqlite
|
||||||
|
namespace: null
|
||||||
|
db_path: ${env.SQLITE_STORE_DIR:~/.llama/distributions/nvidia}/localfs_datasetio.db
|
||||||
- provider_id: nvidia
|
- provider_id: nvidia
|
||||||
provider_type: remote::nvidia
|
provider_type: remote::nvidia
|
||||||
config:
|
config:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue