mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-03 19:57:35 +00:00
updated the error message and ran pre-commit
This commit is contained in:
parent
2d4775c67a
commit
d5d2061c8c
2 changed files with 21 additions and 56 deletions
|
@ -31,9 +31,7 @@ class DistributionRegistry(Protocol):
|
||||||
|
|
||||||
def get_cached(self, identifier: str) -> RoutableObjectWithProvider | None: ...
|
def get_cached(self, identifier: str) -> RoutableObjectWithProvider | None: ...
|
||||||
|
|
||||||
async def update(
|
async def update(self, obj: RoutableObjectWithProvider) -> RoutableObjectWithProvider: ...
|
||||||
self, obj: RoutableObjectWithProvider
|
|
||||||
) -> RoutableObjectWithProvider: ...
|
|
||||||
|
|
||||||
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
|
async def register(self, obj: RoutableObjectWithProvider) -> bool: ...
|
||||||
|
|
||||||
|
@ -59,9 +57,7 @@ def _parse_registry_values(values: list[str]) -> list[RoutableObjectWithProvider
|
||||||
obj = pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(value)
|
obj = pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(value)
|
||||||
all_objects.append(obj)
|
all_objects.append(obj)
|
||||||
except pydantic.ValidationError as e:
|
except pydantic.ValidationError as e:
|
||||||
logger.error(
|
logger.error(f"Error parsing registry value, raw value: {value}. Error: {e}")
|
||||||
f"Error parsing registry value, raw value: {value}. Error: {e}"
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return all_objects
|
return all_objects
|
||||||
|
@ -74,9 +70,7 @@ class DiskDistributionRegistry(DistributionRegistry):
|
||||||
async def initialize(self) -> None:
|
async def initialize(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_cached(
|
def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
|
||||||
self, type: str, identifier: str
|
|
||||||
) -> RoutableObjectWithProvider | None:
|
|
||||||
# Disk registry does not have a cache
|
# Disk registry does not have a cache
|
||||||
raise NotImplementedError("Disk registry does not have a cache")
|
raise NotImplementedError("Disk registry does not have a cache")
|
||||||
|
|
||||||
|
@ -85,23 +79,15 @@ class DiskDistributionRegistry(DistributionRegistry):
|
||||||
values = await self.kvstore.values_in_range(start_key, end_key)
|
values = await self.kvstore.values_in_range(start_key, end_key)
|
||||||
return _parse_registry_values(values)
|
return _parse_registry_values(values)
|
||||||
|
|
||||||
async def get(
|
async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
|
||||||
self, type: str, identifier: str
|
json_str = await self.kvstore.get(KEY_FORMAT.format(type=type, identifier=identifier))
|
||||||
) -> RoutableObjectWithProvider | None:
|
|
||||||
json_str = await self.kvstore.get(
|
|
||||||
KEY_FORMAT.format(type=type, identifier=identifier)
|
|
||||||
)
|
|
||||||
if not json_str:
|
if not json_str:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(
|
return pydantic.TypeAdapter(RoutableObjectWithProvider).validate_json(json_str)
|
||||||
json_str
|
|
||||||
)
|
|
||||||
except pydantic.ValidationError as e:
|
except pydantic.ValidationError as e:
|
||||||
logger.error(
|
logger.error(f"Error parsing registry value for {type}:{identifier}, raw value: {json_str}. Error: {e}")
|
||||||
f"Error parsing registry value for {type}:{identifier}, raw value: {json_str}. Error: {e}"
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def update(self, obj: RoutableObjectWithProvider) -> None:
|
async def update(self, obj: RoutableObjectWithProvider) -> None:
|
||||||
|
@ -116,8 +102,8 @@ class DiskDistributionRegistry(DistributionRegistry):
|
||||||
# dont register if the object's providerid already exists
|
# dont register if the object's providerid already exists
|
||||||
if existing_obj and existing_obj.provider_id == obj.provider_id:
|
if existing_obj and existing_obj.provider_id == obj.provider_id:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"{obj.type.title()} '{obj.identifier}' is already registered with provider '{obj.provider_id}'. "
|
f"Provider '{obj.provider_id}' is already registered."
|
||||||
f"Unregister the existing object first before registering a new one."
|
f"Unregister the existing provider first before registering it again."
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.kvstore.set(
|
await self.kvstore.set(
|
||||||
|
@ -167,9 +153,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
||||||
async def initialize(self) -> None:
|
async def initialize(self) -> None:
|
||||||
await self._ensure_initialized()
|
await self._ensure_initialized()
|
||||||
|
|
||||||
def get_cached(
|
def get_cached(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
|
||||||
self, type: str, identifier: str
|
|
||||||
) -> RoutableObjectWithProvider | None:
|
|
||||||
return self.cache.get((type, identifier), None)
|
return self.cache.get((type, identifier), None)
|
||||||
|
|
||||||
async def get_all(self) -> list[RoutableObjectWithProvider]:
|
async def get_all(self) -> list[RoutableObjectWithProvider]:
|
||||||
|
@ -177,9 +161,7 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
|
||||||
async with self._locked_cache() as cache:
|
async with self._locked_cache() as cache:
|
||||||
return list(cache.values())
|
return list(cache.values())
|
||||||
|
|
||||||
async def get(
|
async def get(self, type: str, identifier: str) -> RoutableObjectWithProvider | None:
|
||||||
self, type: str, identifier: str
|
|
||||||
) -> RoutableObjectWithProvider | None:
|
|
||||||
await self._ensure_initialized()
|
await self._ensure_initialized()
|
||||||
cache_key = (type, identifier)
|
cache_key = (type, identifier)
|
||||||
|
|
||||||
|
@ -221,9 +203,7 @@ async def create_dist_registry(
|
||||||
dist_kvstore = await kvstore_impl(metadata_store)
|
dist_kvstore = await kvstore_impl(metadata_store)
|
||||||
else:
|
else:
|
||||||
dist_kvstore = await kvstore_impl(
|
dist_kvstore = await kvstore_impl(
|
||||||
SqliteKVStoreConfig(
|
SqliteKVStoreConfig(db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix())
|
||||||
db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix()
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
|
dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
|
||||||
await dist_registry.initialize()
|
await dist_registry.initialize()
|
||||||
|
|
|
@ -10,9 +10,9 @@ import pytest
|
||||||
from llama_stack.apis.inference import Model
|
from llama_stack.apis.inference import Model
|
||||||
from llama_stack.apis.vector_dbs import VectorDB
|
from llama_stack.apis.vector_dbs import VectorDB
|
||||||
from llama_stack.core.store.registry import (
|
from llama_stack.core.store.registry import (
|
||||||
|
KEY_FORMAT,
|
||||||
CachedDiskDistributionRegistry,
|
CachedDiskDistributionRegistry,
|
||||||
DiskDistributionRegistry,
|
DiskDistributionRegistry,
|
||||||
KEY_FORMAT,
|
|
||||||
)
|
)
|
||||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||||
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
|
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
|
||||||
|
@ -62,9 +62,7 @@ async def test_basic_registration(disk_dist_registry, sample_vector_db, sample_m
|
||||||
assert result_model.provider_id == sample_model.provider_id
|
assert result_model.provider_id == sample_model.provider_id
|
||||||
|
|
||||||
|
|
||||||
async def test_cached_registry_initialization(
|
async def test_cached_registry_initialization(sqlite_kvstore, sample_vector_db, sample_model):
|
||||||
sqlite_kvstore, sample_vector_db, sample_model
|
|
||||||
):
|
|
||||||
# First populate the disk registry
|
# First populate the disk registry
|
||||||
disk_registry = DiskDistributionRegistry(sqlite_kvstore)
|
disk_registry = DiskDistributionRegistry(sqlite_kvstore)
|
||||||
await disk_registry.initialize()
|
await disk_registry.initialize()
|
||||||
|
@ -73,9 +71,7 @@ async def test_cached_registry_initialization(
|
||||||
|
|
||||||
# Test cached version loads from disk
|
# Test cached version loads from disk
|
||||||
db_path = sqlite_kvstore.db_path
|
db_path = sqlite_kvstore.db_path
|
||||||
cached_registry = CachedDiskDistributionRegistry(
|
cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(SqliteKVStoreConfig(db_path=db_path)))
|
||||||
await kvstore_impl(SqliteKVStoreConfig(db_path=db_path))
|
|
||||||
)
|
|
||||||
await cached_registry.initialize()
|
await cached_registry.initialize()
|
||||||
|
|
||||||
result_vector_db = await cached_registry.get("vector_db", "test_vector_db")
|
result_vector_db = await cached_registry.get("vector_db", "test_vector_db")
|
||||||
|
@ -97,18 +93,14 @@ async def test_cached_registry_updates(cached_disk_dist_registry):
|
||||||
await cached_disk_dist_registry.register(new_vector_db)
|
await cached_disk_dist_registry.register(new_vector_db)
|
||||||
|
|
||||||
# Verify in cache
|
# Verify in cache
|
||||||
result_vector_db = await cached_disk_dist_registry.get(
|
result_vector_db = await cached_disk_dist_registry.get("vector_db", "test_vector_db_2")
|
||||||
"vector_db", "test_vector_db_2"
|
|
||||||
)
|
|
||||||
assert result_vector_db is not None
|
assert result_vector_db is not None
|
||||||
assert result_vector_db.identifier == new_vector_db.identifier
|
assert result_vector_db.identifier == new_vector_db.identifier
|
||||||
assert result_vector_db.provider_id == new_vector_db.provider_id
|
assert result_vector_db.provider_id == new_vector_db.provider_id
|
||||||
|
|
||||||
# Verify persisted to disk
|
# Verify persisted to disk
|
||||||
db_path = cached_disk_dist_registry.kvstore.db_path
|
db_path = cached_disk_dist_registry.kvstore.db_path
|
||||||
new_registry = DiskDistributionRegistry(
|
new_registry = DiskDistributionRegistry(await kvstore_impl(SqliteKVStoreConfig(db_path=db_path)))
|
||||||
await kvstore_impl(SqliteKVStoreConfig(db_path=db_path))
|
|
||||||
)
|
|
||||||
await new_registry.initialize()
|
await new_registry.initialize()
|
||||||
result_vector_db = await new_registry.get("vector_db", "test_vector_db_2")
|
result_vector_db = await new_registry.get("vector_db", "test_vector_db_2")
|
||||||
assert result_vector_db is not None
|
assert result_vector_db is not None
|
||||||
|
@ -137,16 +129,14 @@ async def test_duplicate_provider_registration(cached_disk_dist_registry):
|
||||||
# Now we expect a ValueError to be raised for duplicate registration
|
# Now we expect a ValueError to be raised for duplicate registration
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
ValueError,
|
ValueError,
|
||||||
match=r"Vector_Db.*already registered.*provider.*baz.*Unregister the existing",
|
match=r"Provider 'baz' is already registered.*Unregister the existing provider first before registering it again.",
|
||||||
):
|
):
|
||||||
await cached_disk_dist_registry.register(duplicate_vector_db)
|
await cached_disk_dist_registry.register(duplicate_vector_db)
|
||||||
|
|
||||||
# Verify the original registration is still intact
|
# Verify the original registration is still intact
|
||||||
result = await cached_disk_dist_registry.get("vector_db", "test_vector_db_2")
|
result = await cached_disk_dist_registry.get("vector_db", "test_vector_db_2")
|
||||||
assert result is not None
|
assert result is not None
|
||||||
assert (
|
assert result.embedding_model == original_vector_db.embedding_model # Original values preserved
|
||||||
result.embedding_model == original_vector_db.embedding_model
|
|
||||||
) # Original values preserved
|
|
||||||
|
|
||||||
|
|
||||||
async def test_get_all_objects(cached_disk_dist_registry):
|
async def test_get_all_objects(cached_disk_dist_registry):
|
||||||
|
@ -173,17 +163,12 @@ async def test_get_all_objects(cached_disk_dist_registry):
|
||||||
|
|
||||||
# Verify each vector_db was stored correctly
|
# Verify each vector_db was stored correctly
|
||||||
for original_vector_db in test_vector_dbs:
|
for original_vector_db in test_vector_dbs:
|
||||||
matching_vector_dbs = [
|
matching_vector_dbs = [v for v in all_results if v.identifier == original_vector_db.identifier]
|
||||||
v for v in all_results if v.identifier == original_vector_db.identifier
|
|
||||||
]
|
|
||||||
assert len(matching_vector_dbs) == 1
|
assert len(matching_vector_dbs) == 1
|
||||||
stored_vector_db = matching_vector_dbs[0]
|
stored_vector_db = matching_vector_dbs[0]
|
||||||
assert stored_vector_db.embedding_model == original_vector_db.embedding_model
|
assert stored_vector_db.embedding_model == original_vector_db.embedding_model
|
||||||
assert stored_vector_db.provider_id == original_vector_db.provider_id
|
assert stored_vector_db.provider_id == original_vector_db.provider_id
|
||||||
assert (
|
assert stored_vector_db.embedding_dimension == original_vector_db.embedding_dimension
|
||||||
stored_vector_db.embedding_dimension
|
|
||||||
== original_vector_db.embedding_dimension
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_parse_registry_values_error_handling(sqlite_kvstore):
|
async def test_parse_registry_values_error_handling(sqlite_kvstore):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue