forked from phoenix-oss/llama-stack-mirror
persist registered objects with distribution (#354)
* persist registered objects with distribution * linter fixes * comment * use annotate and field discriminator * workign tests * donot use global state * precommit failures fixed * add back Any * fix imports * remove unnecessary changes in ollama * precommit failures fixed * make kvstore configurable for dist and rename registry * add comment about registry list return * fix linter errors * use registry to hydrate * remove debug print * linter fixes * remove kvstore.db * rename distribution_registry_store --------- Co-authored-by: Dinesh Yeduguru <dineshyv@fb.com>
This commit is contained in:
parent
c9bf1d7d0b
commit
663883cc29
12 changed files with 401 additions and 46 deletions
7
llama_stack/distribution/store/__init__.py
Normal file
7
llama_stack/distribution/store/__init__.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
from .registry import * # noqa: F401 F403
|
135
llama_stack/distribution/store/registry.py
Normal file
135
llama_stack/distribution/store/registry.py
Normal file
|
@ -0,0 +1,135 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import json
|
||||
from typing import Dict, List, Protocol
|
||||
|
||||
import pydantic
|
||||
|
||||
from llama_stack.distribution.datatypes import RoutableObjectWithProvider
|
||||
|
||||
from llama_stack.providers.utils.kvstore import KVStore
|
||||
|
||||
|
||||
class DistributionRegistry(Protocol):
|
||||
async def get_all(self) -> List[RoutableObjectWithProvider]: ...
|
||||
|
||||
async def initialize(self) -> None: ...
|
||||
|
||||
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
|
||||
|
||||
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]: ...
|
||||
|
||||
# The current data structure allows multiple objects with the same identifier but different providers.
|
||||
# This is not ideal - we should have a single object that can be served by multiple providers,
|
||||
# suggesting a data structure like (obj: Obj, providers: List[str]) rather than List[RoutableObjectWithProvider].
|
||||
# The current approach could lead to inconsistencies if the same logical object has different data across providers.
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
|
||||
|
||||
|
||||
KEY_FORMAT = "distributions:registry:{}"
|
||||
|
||||
|
||||
class DiskDistributionRegistry(DistributionRegistry):
|
||||
def __init__(self, kvstore: KVStore):
|
||||
self.kvstore = kvstore
|
||||
|
||||
async def initialize(self) -> None:
|
||||
pass
|
||||
|
||||
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
# Disk registry does not have a cache
|
||||
return []
|
||||
|
||||
async def get_all(self) -> List[RoutableObjectWithProvider]:
|
||||
start_key = KEY_FORMAT.format("")
|
||||
end_key = KEY_FORMAT.format("\xff")
|
||||
keys = await self.kvstore.range(start_key, end_key)
|
||||
return [await self.get(key.split(":")[-1]) for key in keys]
|
||||
|
||||
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
json_str = await self.kvstore.get(KEY_FORMAT.format(identifier))
|
||||
if not json_str:
|
||||
return []
|
||||
|
||||
objects_data = json.loads(json_str)
|
||||
return [
|
||||
pydantic.parse_obj_as(
|
||||
RoutableObjectWithProvider,
|
||||
json.loads(obj_str),
|
||||
)
|
||||
for obj_str in objects_data
|
||||
]
|
||||
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool:
|
||||
existing_objects = await self.get(obj.identifier)
|
||||
# dont register if the object's providerid already exists
|
||||
for eobj in existing_objects:
|
||||
if eobj.provider_id == obj.provider_id:
|
||||
return False
|
||||
|
||||
existing_objects.append(obj)
|
||||
|
||||
objects_json = [
|
||||
obj.model_dump_json() for obj in existing_objects
|
||||
] # Fixed variable name
|
||||
await self.kvstore.set(
|
||||
KEY_FORMAT.format(obj.identifier), json.dumps(objects_json)
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
||||
def __init__(self, kvstore: KVStore):
|
||||
super().__init__(kvstore)
|
||||
self.cache: Dict[str, List[RoutableObjectWithProvider]] = {}
|
||||
|
||||
async def initialize(self) -> None:
|
||||
start_key = KEY_FORMAT.format("")
|
||||
end_key = KEY_FORMAT.format("\xff")
|
||||
|
||||
keys = await self.kvstore.range(start_key, end_key)
|
||||
|
||||
for key in keys:
|
||||
identifier = key.split(":")[-1]
|
||||
objects = await super().get(identifier)
|
||||
if objects:
|
||||
self.cache[identifier] = objects
|
||||
|
||||
def get_cached(self, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
return self.cache.get(identifier, [])
|
||||
|
||||
async def get_all(self) -> List[RoutableObjectWithProvider]:
|
||||
return [item for sublist in self.cache.values() for item in sublist]
|
||||
|
||||
async def get(self, identifier: str) -> List[RoutableObjectWithProvider]:
|
||||
if identifier in self.cache:
|
||||
return self.cache[identifier]
|
||||
|
||||
objects = await super().get(identifier)
|
||||
if objects:
|
||||
self.cache[identifier] = objects
|
||||
|
||||
return objects
|
||||
|
||||
async def register(self, obj: RoutableObjectWithProvider) -> bool:
|
||||
# First update disk
|
||||
success = await super().register(obj)
|
||||
|
||||
if success:
|
||||
# Then update cache
|
||||
if obj.identifier not in self.cache:
|
||||
self.cache[obj.identifier] = []
|
||||
|
||||
# Check if provider already exists in cache
|
||||
for cached_obj in self.cache[obj.identifier]:
|
||||
if cached_obj.provider_id == obj.provider_id:
|
||||
return success
|
||||
|
||||
# If not, update cache
|
||||
self.cache[obj.identifier].append(obj)
|
||||
|
||||
return success
|
171
llama_stack/distribution/store/tests/test_registry.py
Normal file
171
llama_stack/distribution/store/tests/test_registry.py
Normal file
|
@ -0,0 +1,171 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from llama_stack.distribution.store import * # noqa F403
|
||||
from llama_stack.apis.inference import ModelDefWithProvider
|
||||
from llama_stack.apis.memory_banks import VectorMemoryBankDef
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl, SqliteKVStoreConfig
|
||||
from llama_stack.distribution.datatypes import * # noqa F403
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config():
|
||||
config = SqliteKVStoreConfig(db_path="/tmp/test_registry.db")
|
||||
if os.path.exists(config.db_path):
|
||||
os.remove(config.db_path)
|
||||
return config
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def registry(config):
|
||||
registry = DiskDistributionRegistry(await kvstore_impl(config))
|
||||
await registry.initialize()
|
||||
return registry
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def cached_registry(config):
|
||||
registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
|
||||
await registry.initialize()
|
||||
return registry
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_bank():
|
||||
return VectorMemoryBankDef(
|
||||
identifier="test_bank",
|
||||
embedding_model="all-MiniLM-L6-v2",
|
||||
chunk_size_in_tokens=512,
|
||||
overlap_size_in_tokens=64,
|
||||
provider_id="test-provider",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_model():
|
||||
return ModelDefWithProvider(
|
||||
identifier="test_model",
|
||||
llama_model="Llama3.2-3B-Instruct",
|
||||
provider_id="test-provider",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_registry_initialization(registry):
|
||||
# Test empty registry
|
||||
results = await registry.get("nonexistent")
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_basic_registration(registry, sample_bank, sample_model):
|
||||
print(f"Registering {sample_bank}")
|
||||
await registry.register(sample_bank)
|
||||
print(f"Registering {sample_model}")
|
||||
await registry.register(sample_model)
|
||||
print("Getting bank")
|
||||
results = await registry.get("test_bank")
|
||||
assert len(results) == 1
|
||||
result_bank = results[0]
|
||||
assert result_bank.identifier == sample_bank.identifier
|
||||
assert result_bank.embedding_model == sample_bank.embedding_model
|
||||
assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens
|
||||
assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens
|
||||
assert result_bank.provider_id == sample_bank.provider_id
|
||||
|
||||
results = await registry.get("test_model")
|
||||
assert len(results) == 1
|
||||
result_model = results[0]
|
||||
assert result_model.identifier == sample_model.identifier
|
||||
assert result_model.llama_model == sample_model.llama_model
|
||||
assert result_model.provider_id == sample_model.provider_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cached_registry_initialization(config, sample_bank, sample_model):
|
||||
# First populate the disk registry
|
||||
disk_registry = DiskDistributionRegistry(await kvstore_impl(config))
|
||||
await disk_registry.initialize()
|
||||
await disk_registry.register(sample_bank)
|
||||
await disk_registry.register(sample_model)
|
||||
|
||||
# Test cached version loads from disk
|
||||
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
|
||||
await cached_registry.initialize()
|
||||
|
||||
results = await cached_registry.get("test_bank")
|
||||
assert len(results) == 1
|
||||
result_bank = results[0]
|
||||
assert result_bank.identifier == sample_bank.identifier
|
||||
assert result_bank.embedding_model == sample_bank.embedding_model
|
||||
assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens
|
||||
assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens
|
||||
assert result_bank.provider_id == sample_bank.provider_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cached_registry_updates(config):
|
||||
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
|
||||
await cached_registry.initialize()
|
||||
|
||||
new_bank = VectorMemoryBankDef(
|
||||
identifier="test_bank_2",
|
||||
embedding_model="all-MiniLM-L6-v2",
|
||||
chunk_size_in_tokens=256,
|
||||
overlap_size_in_tokens=32,
|
||||
provider_id="baz",
|
||||
)
|
||||
await cached_registry.register(new_bank)
|
||||
|
||||
# Verify in cache
|
||||
results = await cached_registry.get("test_bank_2")
|
||||
assert len(results) == 1
|
||||
result_bank = results[0]
|
||||
assert result_bank.identifier == new_bank.identifier
|
||||
assert result_bank.provider_id == new_bank.provider_id
|
||||
|
||||
# Verify persisted to disk
|
||||
new_registry = DiskDistributionRegistry(await kvstore_impl(config))
|
||||
await new_registry.initialize()
|
||||
results = await new_registry.get("test_bank_2")
|
||||
assert len(results) == 1
|
||||
result_bank = results[0]
|
||||
assert result_bank.identifier == new_bank.identifier
|
||||
assert result_bank.provider_id == new_bank.provider_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_duplicate_provider_registration(config):
|
||||
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config))
|
||||
await cached_registry.initialize()
|
||||
|
||||
original_bank = VectorMemoryBankDef(
|
||||
identifier="test_bank_2",
|
||||
embedding_model="all-MiniLM-L6-v2",
|
||||
chunk_size_in_tokens=256,
|
||||
overlap_size_in_tokens=32,
|
||||
provider_id="baz",
|
||||
)
|
||||
await cached_registry.register(original_bank)
|
||||
|
||||
duplicate_bank = VectorMemoryBankDef(
|
||||
identifier="test_bank_2",
|
||||
embedding_model="different-model",
|
||||
chunk_size_in_tokens=128,
|
||||
overlap_size_in_tokens=16,
|
||||
provider_id="baz", # Same provider_id
|
||||
)
|
||||
await cached_registry.register(duplicate_bank)
|
||||
|
||||
results = await cached_registry.get("test_bank_2")
|
||||
assert len(results) == 1 # Still only one result
|
||||
assert (
|
||||
results[0].embedding_model == original_bank.embedding_model
|
||||
) # Original values preserved
|
Loading…
Add table
Add a link
Reference in a new issue