From 96735e961df3a2d001961b8633d4ee15b3ca806a Mon Sep 17 00:00:00 2001 From: Vladislav Bronzov <58587565+VladOS95-cyber@users.noreply.github.com> Date: Fri, 10 Jan 2025 02:34:18 +0100 Subject: [PATCH] Add persistence for localfs datasets (#557) # What does this PR do? Add persistency logic for localfs datasetio provider - [ ] Addresses issue (#issue) ## Test Plan Please describe: - tests you ran to verify your changes with result summaries. - provide instructions so it can be reproduced. ## Sources Please link relevant resources if necessary. https://github.com/meta-llama/llama-stack/issues/539 ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [x] Ran pre-commit to handle lint / formatting issues. - [x] Read the [contributor guideline](https://github.com/meta-llama/llama-stack/blob/main/CONTRIBUTING.md), Pull Request section? - [ ] Updated relevant documentation. - [ ] Wrote necessary unit or integration tests. --- .../inline/datasetio/localfs/config.py | 11 +++++++- .../inline/datasetio/localfs/datasetio.py | 28 ++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/llama_stack/providers/inline/datasetio/localfs/config.py b/llama_stack/providers/inline/datasetio/localfs/config.py index 1b89df63b..f4f495b95 100644 --- a/llama_stack/providers/inline/datasetio/localfs/config.py +++ b/llama_stack/providers/inline/datasetio/localfs/config.py @@ -5,5 +5,14 @@ # the root directory of this source tree. from pydantic import BaseModel +from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) -class LocalFSDatasetIOConfig(BaseModel): ... + +class LocalFSDatasetIOConfig(BaseModel): + kvstore: KVStoreConfig = SqliteKVStoreConfig( + db_path=(RUNTIME_BASE_DIR / "localfs_datasetio.db").as_posix() + ) # Uses SQLite config specific to localfs storage diff --git a/llama_stack/providers/inline/datasetio/localfs/datasetio.py b/llama_stack/providers/inline/datasetio/localfs/datasetio.py index 442053fb3..d1903e861 100644 --- a/llama_stack/providers/inline/datasetio/localfs/datasetio.py +++ b/llama_stack/providers/inline/datasetio/localfs/datasetio.py @@ -18,10 +18,14 @@ from llama_stack.apis.datasets import Dataset from llama_stack.providers.datatypes import DatasetsProtocolPrivate from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_url +from llama_stack.providers.utils.kvstore import kvstore_impl from .config import LocalFSDatasetIOConfig +DATASETS_PREFIX = "localfs_datasets:" + + class BaseDataset(ABC): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -86,8 +90,22 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): self.config = config # local registry for keeping track of datasets within the provider self.dataset_infos = {} + self.kvstore = None - async def initialize(self) -> None: ... + async def initialize(self) -> None: + self.kvstore = await kvstore_impl(self.config.kvstore) + # Load existing datasets from kvstore + start_key = DATASETS_PREFIX + end_key = f"{DATASETS_PREFIX}\xff" + stored_datasets = await self.kvstore.range(start_key, end_key) + + for dataset in stored_datasets: + dataset = Dataset.model_validate_json(dataset) + dataset_impl = PandasDataframeDataset(dataset) + self.dataset_infos[dataset.identifier] = DatasetInfo( + dataset_def=dataset, + dataset_impl=dataset_impl, + ) async def shutdown(self) -> None: ... @@ -95,6 +113,12 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): self, dataset: Dataset, ) -> None: + # Store in kvstore + key = f"{DATASETS_PREFIX}{dataset.identifier}" + await self.kvstore.set( + key=key, + value=dataset.json(), + ) dataset_impl = PandasDataframeDataset(dataset) self.dataset_infos[dataset.identifier] = DatasetInfo( dataset_def=dataset, @@ -102,6 +126,8 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate): ) async def unregister_dataset(self, dataset_id: str) -> None: + key = f"{DATASETS_PREFIX}{dataset_id}" + await self.kvstore.delete(key=key) del self.dataset_infos[dataset_id] async def get_rows_paginated(