add kvstore config and persistence for localfs ds

This commit is contained in:
Vladislav 2024-12-02 20:09:51 +01:00
parent 144abd2e71
commit a4a7a90e42
2 changed files with 35 additions and 3 deletions

View file

@ -5,5 +5,14 @@
# the root directory of this source tree. # the root directory of this source tree.
from llama_stack.apis.datasetio import * # noqa: F401, F403 from llama_stack.apis.datasetio import * # noqa: F401, F403
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
class LocalFSDatasetIOConfig(BaseModel): ...
class LocalFSDatasetIOConfig(BaseModel):
kvstore: KVStoreConfig = SqliteKVStoreConfig(
db_path=(RUNTIME_BASE_DIR / "localfs_datasetio.db").as_posix()
) # Uses SQLite config specific to HF storage

View file

@ -17,10 +17,14 @@ from urllib.parse import urlparse
from llama_stack.providers.datatypes import DatasetsProtocolPrivate from llama_stack.providers.datatypes import DatasetsProtocolPrivate
from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_url from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_url
from llama_stack.providers.utils.kvstore import kvstore_impl
from .config import LocalFSDatasetIOConfig from .config import LocalFSDatasetIOConfig
DATASETS_PREFIX = "locallfs_datasets:"
class BaseDataset(ABC): class BaseDataset(ABC):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -86,7 +90,20 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
# local registry for keeping track of datasets within the provider # local registry for keeping track of datasets within the provider
self.dataset_infos = {} self.dataset_infos = {}
async def initialize(self) -> None: ... async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)
# Load existing datasets from kvstore
start_key = DATASETS_PREFIX
end_key = f"{DATASETS_PREFIX}\xff"
stored_datasets = await self.kvstore.range(start_key, end_key)
for dataset in stored_datasets:
dataset = Dataset.model_validate_json(dataset)
dataset_impl = PandasDataframeDataset(dataset)
self.dataset_infos[dataset.identifier] = DatasetInfo(
dataset_def=dataset,
dataset_impl=dataset_impl,
)
async def shutdown(self) -> None: ... async def shutdown(self) -> None: ...
@ -94,8 +111,14 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
self, self,
dataset: Dataset, dataset: Dataset,
) -> None: ) -> None:
# Store in kvstore
key = f"{DATASETS_PREFIX}{dataset.identifier}"
dataset_impl = PandasDataframeDataset(dataset) dataset_impl = PandasDataframeDataset(dataset)
self.dataset_infos[dataset.identifier] = DatasetInfo( await self.kvstore.set(
key=key,
value=dataset.json(),
)
self.dataset_infos[key] = DatasetInfo(
dataset_def=dataset, dataset_def=dataset,
dataset_impl=dataset_impl, dataset_impl=dataset_impl,
) )