Add persistence for localfs datasets (#557)

# What does this PR do?

Add persistency logic for localfs datasetio provider

- [ ] Addresses issue (#issue)


## Test Plan

Please describe:
 - tests you ran to verify your changes with result summaries.
 - provide instructions so it can be reproduced.


## Sources

Please link relevant resources if necessary.
https://github.com/meta-llama/llama-stack/issues/539

## Before submitting

- [ ] This PR fixes a typo or improves the docs (you can dismiss the
other checks if that's the case).
- [x] Ran pre-commit to handle lint / formatting issues.
- [x] Read the [contributor
guideline](https://github.com/meta-llama/llama-stack/blob/main/CONTRIBUTING.md),
      Pull Request section?
- [ ] Updated relevant documentation.
- [ ] Wrote necessary unit or integration tests.
This commit is contained in:
Vladislav Bronzov 2025-01-10 02:34:18 +01:00 committed by GitHub
parent 4938f2fe5d
commit 96735e961d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 37 additions and 2 deletions

View file

@ -5,5 +5,14 @@
# the root directory of this source tree.
from pydantic import BaseModel
from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
class LocalFSDatasetIOConfig(BaseModel): ...
class LocalFSDatasetIOConfig(BaseModel):
kvstore: KVStoreConfig = SqliteKVStoreConfig(
db_path=(RUNTIME_BASE_DIR / "localfs_datasetio.db").as_posix()
) # Uses SQLite config specific to localfs storage

View file

@ -18,10 +18,14 @@ from llama_stack.apis.datasets import Dataset
from llama_stack.providers.datatypes import DatasetsProtocolPrivate
from llama_stack.providers.utils.datasetio.url_utils import get_dataframe_from_url
from llama_stack.providers.utils.kvstore import kvstore_impl
from .config import LocalFSDatasetIOConfig
DATASETS_PREFIX = "localfs_datasets:"
class BaseDataset(ABC):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@ -86,8 +90,22 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
self.config = config
# local registry for keeping track of datasets within the provider
self.dataset_infos = {}
self.kvstore = None
async def initialize(self) -> None: ...
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.kvstore)
# Load existing datasets from kvstore
start_key = DATASETS_PREFIX
end_key = f"{DATASETS_PREFIX}\xff"
stored_datasets = await self.kvstore.range(start_key, end_key)
for dataset in stored_datasets:
dataset = Dataset.model_validate_json(dataset)
dataset_impl = PandasDataframeDataset(dataset)
self.dataset_infos[dataset.identifier] = DatasetInfo(
dataset_def=dataset,
dataset_impl=dataset_impl,
)
async def shutdown(self) -> None: ...
@ -95,6 +113,12 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
self,
dataset: Dataset,
) -> None:
# Store in kvstore
key = f"{DATASETS_PREFIX}{dataset.identifier}"
await self.kvstore.set(
key=key,
value=dataset.json(),
)
dataset_impl = PandasDataframeDataset(dataset)
self.dataset_infos[dataset.identifier] = DatasetInfo(
dataset_def=dataset,
@ -102,6 +126,8 @@ class LocalFSDatasetIOImpl(DatasetIO, DatasetsProtocolPrivate):
)
async def unregister_dataset(self, dataset_id: str) -> None:
key = f"{DATASETS_PREFIX}{dataset_id}"
await self.kvstore.delete(key=key)
del self.dataset_infos[dataset_id]
async def get_rows_paginated(