diff --git a/llama_stack/providers/inline/datasetio/localfs/config.py b/llama_stack/providers/inline/datasetio/localfs/config.py index 1b89df63b..f4f495b95 100644 --- a/llama_stack/providers/inline/datasetio/localfs/config.py +++ b/llama_stack/providers/inline/datasetio/localfs/config.py @@ -5,5 +5,14 @@ # the root directory of this source tree. from pydantic import BaseModel +from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) -class LocalFSDatasetIOConfig(BaseModel): ... + +class LocalFSDatasetIOConfig(BaseModel): + kvstore: KVStoreConfig = SqliteKVStoreConfig( + db_path=(RUNTIME_BASE_DIR / "localfs_datasetio.db").as_posix() + ) # Uses SQLite config specific to localfs storage diff --git a/llama_stack/providers/inline/datasetio/localfs/datasetio.py b/llama_stack/providers/inline/datasetio/localfs/datasetio.py index 442053fb3..d1903e861 100644 --- a/llama_stack/providers/inline/datasetio/localfs/datasetio.py +++ b/llama_stack/providers/inline/datasetio/localfs/datasetio.py @@ -18,10 +18,14 @@ from llama_stack.apis.datasets import Dataset from llama_stack.providers.datatypes import DatasetsProtocolPrivate from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_url +from llama_stack.providers.utils.kvstore import kvstore_impl from .config import LocalFSDatasetIOConfig +DATASETS_PREFIX = "localfs_datasets:" + + class BaseDataset(ABC): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -86,8 +90,22 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): self.config = config # local registry for keeping track of datasets within the provider self.dataset_infos = {} + self.kvstore = None - async def initialize(self) -> None: ... + async def initialize(self) -> None: + self.kvstore = await kvstore_impl(self.config.kvstore) + # Load existing datasets from kvstore + start_key = DATASETS_PREFIX + end_key = f"{DATASETS_PREFIX}\xff" + stored_datasets = await self.kvstore.range(start_key, end_key) + + for dataset in stored_datasets: + dataset = Dataset.model_validate_json(dataset) + dataset_impl = PandasDataframeDataset(dataset) + self.dataset_infos[dataset.identifier] = DatasetInfo( + dataset_def=dataset, + dataset_impl=dataset_impl, + ) async def shutdown(self) -> None: ... @@ -95,6 +113,12 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): self, dataset: Dataset, ) -> None: + # Store in kvstore + key = f"{DATASETS_PREFIX}{dataset.identifier}" + await self.kvstore.set( + key=key, + value=dataset.json(), + ) dataset_impl = PandasDataframeDataset(dataset) self.dataset_infos[dataset.identifier] = DatasetInfo( dataset_def=dataset, @@ -102,6 +126,8 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): ) async def unregister_dataset(self, dataset_id: str) -> None: + key = f"{DATASETS_PREFIX}{dataset_id}" + await self.kvstore.delete(key=key) del self.dataset_infos[dataset_id] async def get_rows_paginated(