migrate dataset to resource (#420)

* migrate dataset to resource

* remove auto discovery

* remove listing of providers's datasets

* fix after rebase

---------

Co-authored-by: Dinesh Yeduguru <dineshyv@fb.com>
This commit is contained in:
Dinesh Yeduguru 2024-11-11 17:14:41 -08:00 committed by GitHub
parent 38cce97597
commit b95cb5308f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 71 additions and 62 deletions

View file

@ -21,7 +21,7 @@ class PaginatedRowsResult(BaseModel):
class DatasetStore(Protocol): class DatasetStore(Protocol):
def get_dataset(self, identifier: str) -> DatasetDefWithProvider: ... def get_dataset(self, dataset_id: str) -> Dataset: ...
@runtime_checkable @runtime_checkable

View file

@ -10,19 +10,16 @@ from llama_models.llama3.api.datatypes import URL
from llama_models.schema_utils import json_schema_type, webmethod from llama_models.schema_utils import json_schema_type, webmethod
from pydantic import BaseModel, Field from pydantic import Field
from llama_stack.apis.common.type_system import ParamType from llama_stack.apis.common.type_system import ParamType
from llama_stack.apis.resource import Resource
@json_schema_type @json_schema_type
class DatasetDef(BaseModel): class Dataset(Resource):
identifier: str = Field( type: Literal["dataset"] = "dataset"
description="A unique name for the dataset", schema: Dict[str, ParamType]
)
dataset_schema: Dict[str, ParamType] = Field(
description="The schema definition for this dataset",
)
url: URL url: URL
metadata: Dict[str, Any] = Field( metadata: Dict[str, Any] = Field(
default_factory=dict, default_factory=dict,
@ -30,26 +27,23 @@ class DatasetDef(BaseModel):
) )
@json_schema_type
class DatasetDefWithProvider(DatasetDef):
type: Literal["dataset"] = "dataset"
provider_id: str = Field(
description="ID of the provider which serves this dataset",
)
class Datasets(Protocol): class Datasets(Protocol):
@webmethod(route="/datasets/register", method="POST") @webmethod(route="/datasets/register", method="POST")
async def register_dataset( async def register_dataset(
self, self,
dataset_def: DatasetDefWithProvider, dataset_id: str,
schema: Dict[str, ParamType],
url: URL,
provider_dataset_id: Optional[str] = None,
provider_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None: ... ) -> None: ...
@webmethod(route="/datasets/get", method="GET") @webmethod(route="/datasets/get", method="GET")
async def get_dataset( async def get_dataset(
self, self,
dataset_identifier: str, dataset_id: str,
) -> Optional[DatasetDefWithProvider]: ... ) -> Optional[Dataset]: ...
@webmethod(route="/datasets/list", method="GET") @webmethod(route="/datasets/list", method="GET")
async def list_datasets(self) -> List[DatasetDefWithProvider]: ... async def list_datasets(self) -> List[Dataset]: ...

View file

@ -34,7 +34,7 @@ RoutableObject = Union[
Model, Model,
Shield, Shield,
MemoryBank, MemoryBank,
DatasetDef, Dataset,
ScoringFnDef, ScoringFnDef,
] ]
@ -44,7 +44,7 @@ RoutableObjectWithProvider = Annotated[
Model, Model,
Shield, Shield,
MemoryBank, MemoryBank,
DatasetDefWithProvider, Dataset,
ScoringFnDefWithProvider, ScoringFnDefWithProvider,
], ],
Field(discriminator="type"), Field(discriminator="type"),

View file

@ -17,6 +17,9 @@ from llama_stack.apis.datasets import * # noqa: F403
from llama_stack.apis.eval_tasks import * # noqa: F403 from llama_stack.apis.eval_tasks import * # noqa: F403
from llama_models.llama3.api.datatypes import URL
from llama_stack.apis.common.type_system import ParamType
from llama_stack.distribution.store import DistributionRegistry from llama_stack.distribution.store import DistributionRegistry
from llama_stack.distribution.datatypes import * # noqa: F403 from llama_stack.distribution.datatypes import * # noqa: F403
@ -94,8 +97,6 @@ class CommonRoutingTableImpl(RoutingTable):
elif api == Api.datasetio: elif api == Api.datasetio:
p.dataset_store = self p.dataset_store = self
datasets = await p.list_datasets()
await add_objects(datasets, pid, DatasetDefWithProvider)
elif api == Api.scoring: elif api == Api.scoring:
p.scoring_function_store = self p.scoring_function_store = self
@ -302,16 +303,42 @@ class MemoryBanksRoutingTable(CommonRoutingTableImpl, MemoryBanks):
class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets): class DatasetsRoutingTable(CommonRoutingTableImpl, Datasets):
async def list_datasets(self) -> List[DatasetDefWithProvider]: async def list_datasets(self) -> List[Dataset]:
return await self.get_all_with_type("dataset") return await self.get_all_with_type("dataset")
async def get_dataset( async def get_dataset(self, dataset_id: str) -> Optional[Dataset]:
self, dataset_identifier: str return await self.get_object_by_identifier(dataset_id)
) -> Optional[DatasetDefWithProvider]:
return await self.get_object_by_identifier(dataset_identifier)
async def register_dataset(self, dataset_def: DatasetDefWithProvider) -> None: async def register_dataset(
await self.register_object(dataset_def) self,
dataset_id: str,
schema: Dict[str, ParamType],
url: URL,
provider_dataset_id: Optional[str] = None,
provider_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
if provider_dataset_id is None:
provider_dataset_id = dataset_id
if provider_id is None:
# If provider_id not specified, use the only provider if it supports this dataset
if len(self.impls_by_provider_id) == 1:
provider_id = list(self.impls_by_provider_id.keys())[0]
else:
raise ValueError(
"No provider specified and multiple providers available. Please specify a provider_id."
)
if metadata is None:
metadata = {}
dataset = Dataset(
identifier=dataset_id,
provider_resource_id=provider_dataset_id,
provider_id=provider_id,
schema=schema,
url=url,
metadata=metadata,
)
await self.register_object(dataset)
class ScoringFunctionsRoutingTable(CommonRoutingTableImpl, ScoringFunctions): class ScoringFunctionsRoutingTable(CommonRoutingTableImpl, ScoringFunctions):

View file

@ -3,7 +3,7 @@
# #
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
from typing import List, Optional from typing import Optional
from llama_stack.apis.datasetio import * # noqa: F403 from llama_stack.apis.datasetio import * # noqa: F403
@ -15,7 +15,7 @@ from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_u
from .config import HuggingfaceDatasetIOConfig from .config import HuggingfaceDatasetIOConfig
def load_hf_dataset(dataset_def: DatasetDef): def load_hf_dataset(dataset_def: Dataset):
if dataset_def.metadata.get("path", None): if dataset_def.metadata.get("path", None):
return hf_datasets.load_dataset(**dataset_def.metadata) return hf_datasets.load_dataset(**dataset_def.metadata)
@ -41,13 +41,10 @@ class HuggingfaceDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
async def register_dataset( async def register_dataset(
self, self,
dataset_def: DatasetDef, dataset_def: Dataset,
) -> None: ) -> None:
self.dataset_infos[dataset_def.identifier] = dataset_def self.dataset_infos[dataset_def.identifier] = dataset_def
async def list_datasets(self) -> List[DatasetDef]:
return list(self.dataset_infos.values())
async def get_rows_paginated( async def get_rows_paginated(
self, self,
dataset_id: str, dataset_id: str,

View file

@ -11,7 +11,7 @@ from urllib.parse import urlparse
from llama_models.schema_utils import json_schema_type from llama_models.schema_utils import json_schema_type
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from llama_stack.apis.datasets import DatasetDef from llama_stack.apis.datasets import Dataset
from llama_stack.apis.eval_tasks import EvalTaskDef from llama_stack.apis.eval_tasks import EvalTaskDef
from llama_stack.apis.memory_banks.memory_banks import MemoryBank from llama_stack.apis.memory_banks.memory_banks import MemoryBank
from llama_stack.apis.models import Model from llama_stack.apis.models import Model
@ -57,9 +57,7 @@ class MemoryBanksProtocolPrivate(Protocol):
class DatasetsProtocolPrivate(Protocol): class DatasetsProtocolPrivate(Protocol):
async def list_datasets(self) -> List[DatasetDef]: ... async def register_dataset(self, dataset: Dataset) -> None: ...
async def register_dataset(self, dataset_def: DatasetDef) -> None: ...
class ScoringFunctionsProtocolPrivate(Protocol): class ScoringFunctionsProtocolPrivate(Protocol):

View file

@ -3,7 +3,7 @@
# #
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
from typing import List, Optional from typing import Optional
import pandas import pandas
from llama_models.llama3.api.datatypes import * # noqa: F403 from llama_models.llama3.api.datatypes import * # noqa: F403
@ -37,12 +37,12 @@ class BaseDataset(ABC):
@dataclass @dataclass
class DatasetInfo: class DatasetInfo:
dataset_def: DatasetDef dataset_def: Dataset
dataset_impl: BaseDataset dataset_impl: BaseDataset
class PandasDataframeDataset(BaseDataset): class PandasDataframeDataset(BaseDataset):
def __init__(self, dataset_def: DatasetDef, *args, **kwargs) -> None: def __init__(self, dataset_def: Dataset, *args, **kwargs) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.dataset_def = dataset_def self.dataset_def = dataset_def
self.df = None self.df = None
@ -60,9 +60,9 @@ class PandasDataframeDataset(BaseDataset):
def _validate_dataset_schema(self, df) -> pandas.DataFrame: def _validate_dataset_schema(self, df) -> pandas.DataFrame:
# note that we will drop any columns in dataset that are not in the schema # note that we will drop any columns in dataset that are not in the schema
df = df[self.dataset_def.dataset_schema.keys()] df = df[self.dataset_def.schema.keys()]
# check all columns in dataset schema are present # check all columns in dataset schema are present
assert len(df.columns) == len(self.dataset_def.dataset_schema) assert len(df.columns) == len(self.dataset_def.schema)
# TODO: type checking against column types in dataset schema # TODO: type checking against column types in dataset schema
return df return df
@ -89,17 +89,14 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
async def register_dataset( async def register_dataset(
self, self,
dataset_def: DatasetDef, dataset: Dataset,
) -> None: ) -> None:
dataset_impl = PandasDataframeDataset(dataset_def) dataset_impl = PandasDataframeDataset(dataset)
self.dataset_infos[dataset_def.identifier] = DatasetInfo( self.dataset_infos[dataset.identifier] = DatasetInfo(
dataset_def=dataset_def, dataset_def=dataset,
dataset_impl=dataset_impl, dataset_impl=dataset_impl,
) )
async def list_datasets(self) -> List[DatasetDef]:
return [i.dataset_def for i in self.dataset_infos.values()]
async def get_rows_paginated( async def get_rows_paginated(
self, self,
dataset_id: str, dataset_id: str,

View file

@ -55,15 +55,11 @@ async def register_dataset(
"generated_answer": StringType(), "generated_answer": StringType(),
} }
dataset = DatasetDefWithProvider( await datasets_impl.register_dataset(
identifier=dataset_id, dataset_id=dataset_id,
provider_id="", schema=dataset_schema,
url=URL( url=URL(uri=test_url),
uri=test_url,
),
dataset_schema=dataset_schema,
) )
await datasets_impl.register_dataset(dataset)
class TestDatasetIO: class TestDatasetIO: