# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. import os import pytest import pytest_asyncio from llama_stack.apis.inference import Model from llama_stack.apis.vector_dbs import VectorDB from llama_stack.distribution.store.registry import ( CachedDiskDistributionRegistry, DiskDistributionRegistry, ) from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig @pytest.fixture def config(): config = SqliteKVStoreConfig(db_path="/tmp/test_registry.db") if os.path.exists(config.db_path): os.remove(config.db_path) return config @pytest_asyncio.fixture(scope="function") async def registry(config): registry = DiskDistributionRegistry(await kvstore_impl(config)) await registry.initialize() return registry @pytest_asyncio.fixture(scope="function") async def cached_registry(config): registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) await registry.initialize() return registry @pytest.fixture def sample_vector_db(): return VectorDB( identifier="test_vector_db", embedding_model="all-MiniLM-L6-v2", embedding_dimension=384, provider_resource_id="test_vector_db", provider_id="test-provider", ) @pytest.fixture def sample_model(): return Model( identifier="test_model", provider_resource_id="test_model", provider_id="test-provider", ) @pytest.mark.asyncio async def test_registry_initialization(registry): # Test empty registry result = await registry.get("nonexistent", "nonexistent") assert result is None @pytest.mark.asyncio async def test_basic_registration(registry, sample_vector_db, sample_model): print(f"Registering {sample_vector_db}") await registry.register(sample_vector_db) print(f"Registering {sample_model}") await registry.register(sample_model) print("Getting vector_db") result_vector_db = await registry.get("vector_db", "test_vector_db") assert result_vector_db is not None assert result_vector_db.identifier == sample_vector_db.identifier assert result_vector_db.embedding_model == sample_vector_db.embedding_model assert result_vector_db.provider_id == sample_vector_db.provider_id result_model = await registry.get("model", "test_model") assert result_model is not None assert result_model.identifier == sample_model.identifier assert result_model.provider_id == sample_model.provider_id @pytest.mark.asyncio async def test_cached_registry_initialization(config, sample_vector_db, sample_model): # First populate the disk registry disk_registry = DiskDistributionRegistry(await kvstore_impl(config)) await disk_registry.initialize() await disk_registry.register(sample_vector_db) await disk_registry.register(sample_model) # Test cached version loads from disk cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) await cached_registry.initialize() result_vector_db = await cached_registry.get("vector_db", "test_vector_db") assert result_vector_db is not None assert result_vector_db.identifier == sample_vector_db.identifier assert result_vector_db.embedding_model == sample_vector_db.embedding_model assert result_vector_db.embedding_dimension == sample_vector_db.embedding_dimension assert result_vector_db.provider_id == sample_vector_db.provider_id @pytest.mark.asyncio async def test_cached_registry_updates(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) await cached_registry.initialize() new_vector_db = VectorDB( identifier="test_vector_db_2", embedding_model="all-MiniLM-L6-v2", embedding_dimension=384, provider_resource_id="test_vector_db_2", provider_id="baz", ) await cached_registry.register(new_vector_db) # Verify in cache result_vector_db = await cached_registry.get("vector_db", "test_vector_db_2") assert result_vector_db is not None assert result_vector_db.identifier == new_vector_db.identifier assert result_vector_db.provider_id == new_vector_db.provider_id # Verify persisted to disk new_registry = DiskDistributionRegistry(await kvstore_impl(config)) await new_registry.initialize() result_vector_db = await new_registry.get("vector_db", "test_vector_db_2") assert result_vector_db is not None assert result_vector_db.identifier == new_vector_db.identifier assert result_vector_db.provider_id == new_vector_db.provider_id @pytest.mark.asyncio async def test_duplicate_provider_registration(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) await cached_registry.initialize() original_vector_db = VectorDB( identifier="test_vector_db_2", embedding_model="all-MiniLM-L6-v2", embedding_dimension=384, provider_resource_id="test_vector_db_2", provider_id="baz", ) await cached_registry.register(original_vector_db) duplicate_vector_db = VectorDB( identifier="test_vector_db_2", embedding_model="different-model", embedding_dimension=384, provider_resource_id="test_vector_db_2", provider_id="baz", # Same provider_id ) await cached_registry.register(duplicate_vector_db) result = await cached_registry.get("vector_db", "test_vector_db_2") assert result is not None assert result.embedding_model == original_vector_db.embedding_model # Original values preserved @pytest.mark.asyncio async def test_get_all_objects(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) await cached_registry.initialize() # Create multiple test banks test_vector_dbs = [ VectorDB( identifier=f"test_vector_db_{i}", embedding_model="all-MiniLM-L6-v2", embedding_dimension=384, provider_resource_id=f"test_vector_db_{i}", provider_id=f"provider_{i}", ) for i in range(3) ] # Register all vector_dbs for vector_db in test_vector_dbs: await cached_registry.register(vector_db) # Test get_all retrieval all_results = await cached_registry.get_all() assert len(all_results) == 3 # Verify each vector_db was stored correctly for original_vector_db in test_vector_dbs: matching_vector_dbs = [v for v in all_results if v.identifier == original_vector_db.identifier] assert len(matching_vector_dbs) == 1 stored_vector_db = matching_vector_dbs[0] assert stored_vector_db.embedding_model == original_vector_db.embedding_model assert stored_vector_db.provider_id == original_vector_db.provider_id assert stored_vector_db.embedding_dimension == original_vector_db.embedding_dimension