diff --git a/llama_stack/distribution/store/tests/test_registry.py b/llama_stack/distribution/store/tests/test_registry.py index 7e389cccd..d20151b7c 100644 --- a/llama_stack/distribution/store/tests/test_registry.py +++ b/llama_stack/distribution/store/tests/test_registry.py @@ -11,7 +11,7 @@ import pytest_asyncio from llama_stack.distribution.store import * # noqa F403 from llama_stack.apis.inference import Model from llama_stack.apis.memory_banks import VectorMemoryBank -from llama_stack.providers.utils.kvstore import kvstore_impl, SqliteKVStoreConfig +from llama_stack.providers.utils.kvstore import kvstore_impl, SqliteKVStoreConfig, RedisKVStoreConfig from llama_stack.distribution.datatypes import * # noqa F403 @@ -23,6 +23,11 @@ def config(): return config +@pytest.fixture +def redis_config(): + return RedisKVStoreConfig(host="localhost", port=6379) + + @pytest_asyncio.fixture async def registry(config): registry = DiskDistributionRegistry(await kvstore_impl(config)) @@ -30,6 +35,13 @@ async def registry(config): return registry +@pytest_asyncio.fixture +async def redis_registry(redis_config): + registry = DiskDistributionRegistry(await kvstore_impl(redis_config)) + await registry.initialize() + return registry + + @pytest_asyncio.fixture async def cached_registry(config): registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) @@ -37,6 +49,13 @@ async def cached_registry(config): return registry +@pytest_asyncio.fixture +async def cached_redis_registry(redis_config): + registry = CachedDiskDistributionRegistry(await kvstore_impl(redis_config)) + await registry.initialize() + return registry + + @pytest.fixture def sample_bank(): return VectorMemoryBank( @@ -65,6 +84,13 @@ async def test_registry_initialization(registry): assert len(results) == 0 +@pytest.mark.asyncio +async def test_redis_registry_initialization(redis_registry): + # Test empty registry + results = await redis_registry.get("nonexistent", "nonexistent") + assert len(results) == 0 + + @pytest.mark.asyncio async def test_basic_registration(registry, sample_bank, sample_model): print(f"Registering {sample_bank}") @@ -88,6 +114,29 @@ async def test_basic_registration(registry, sample_bank, sample_model): assert result_model.provider_id == sample_model.provider_id +@pytest.mark.asyncio +async def test_redis_basic_registration(redis_registry, sample_bank, sample_model): + print(f"Registering {sample_bank}") + await redis_registry.register(sample_bank) + print(f"Registering {sample_model}") + await redis_registry.register(sample_model) + print("Getting bank") + results = await redis_registry.get("memory_bank", "test_bank") + assert len(results) == 1 + result_bank = results[0] + assert result_bank.identifier == sample_bank.identifier + assert result_bank.embedding_model == sample_bank.embedding_model + assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens + assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens + assert result_bank.provider_id == sample_bank.provider_id + + results = await redis_registry.get("model", "test_model") + assert len(results) == 1 + result_model = results[0] + assert result_model.identifier == sample_model.identifier + assert result_model.provider_id == sample_model.provider_id + + @pytest.mark.asyncio async def test_cached_registry_initialization(config, sample_bank, sample_model): # First populate the disk registry @@ -110,6 +159,28 @@ async def test_cached_registry_initialization(config, sample_bank, sample_model) assert result_bank.provider_id == sample_bank.provider_id +@pytest.mark.asyncio +async def test_cached_redis_registry_initialization(redis_config, sample_bank, sample_model): + # First populate the redis registry + redis_registry = DiskDistributionRegistry(await kvstore_impl(redis_config)) + await redis_registry.initialize() + await redis_registry.register(sample_bank) + await redis_registry.register(sample_model) + + # Test cached version loads from redis + cached_redis_registry = CachedDiskDistributionRegistry(await kvstore_impl(redis_config)) + await cached_redis_registry.initialize() + + results = await cached_redis_registry.get("memory_bank", "test_bank") + assert len(results) == 1 + result_bank = results[0] + assert result_bank.identifier == sample_bank.identifier + assert result_bank.embedding_model == sample_bank.embedding_model + assert result_bank.chunk_size_in_tokens == sample_bank.chunk_size_in_tokens + assert result_bank.overlap_size_in_tokens == sample_bank.overlap_size_in_tokens + assert result_bank.provider_id == sample_bank.provider_id + + @pytest.mark.asyncio async def test_cached_registry_updates(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) @@ -142,6 +213,38 @@ async def test_cached_registry_updates(config): assert result_bank.provider_id == new_bank.provider_id +@pytest.mark.asyncio +async def test_cached_redis_registry_updates(redis_config): + cached_redis_registry = CachedDiskDistributionRegistry(await kvstore_impl(redis_config)) + await cached_redis_registry.initialize() + + new_bank = VectorMemoryBank( + identifier="test_bank_2", + embedding_model="all-MiniLM-L6-v2", + chunk_size_in_tokens=256, + overlap_size_in_tokens=32, + provider_resource_id="test_bank_2", + provider_id="baz", + ) + await cached_redis_registry.register(new_bank) + + # Verify in cache + results = await cached_redis_registry.get("memory_bank", "test_bank_2") + assert len(results) == 1 + result_bank = results[0] + assert result_bank.identifier == new_bank.identifier + assert result_bank.provider_id == new_bank.provider_id + + # Verify persisted to redis + new_registry = DiskDistributionRegistry(await kvstore_impl(redis_config)) + await new_registry.initialize() + results = await new_registry.get("memory_bank", "test_bank_2") + assert len(results) == 1 + result_bank = results[0] + assert result_bank.identifier == new_bank.identifier + assert result_bank.provider_id == new_bank.provider_id + + @pytest.mark.asyncio async def test_duplicate_provider_registration(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) @@ -174,6 +277,38 @@ async def test_duplicate_provider_registration(config): ) # Original values preserved +@pytest.mark.asyncio +async def test_redis_duplicate_provider_registration(redis_config): + cached_redis_registry = CachedDiskDistributionRegistry(await kvstore_impl(redis_config)) + await cached_redis_registry.initialize() + + original_bank = VectorMemoryBank( + identifier="test_bank_2", + embedding_model="all-MiniLM-L6-v2", + chunk_size_in_tokens=256, + overlap_size_in_tokens=32, + provider_resource_id="test_bank_2", + provider_id="baz", + ) + await cached_redis_registry.register(original_bank) + + duplicate_bank = VectorMemoryBank( + identifier="test_bank_2", + embedding_model="different-model", + chunk_size_in_tokens=128, + overlap_size_in_tokens=16, + provider_resource_id="test_bank_2", + provider_id="baz", # Same provider_id + ) + await cached_redis_registry.register(duplicate_bank) + + results = await cached_redis_registry.get("memory_bank", "test_bank_2") + assert len(results) == 1 # Still only one result + assert ( + results[0].embedding_model == original_bank.embedding_model + ) # Original values preserved + + @pytest.mark.asyncio async def test_get_all_objects(config): cached_registry = CachedDiskDistributionRegistry(await kvstore_impl(config)) @@ -213,3 +348,44 @@ async def test_get_all_objects(config): assert ( stored_bank.overlap_size_in_tokens == original_bank.overlap_size_in_tokens ) + + +@pytest.mark.asyncio +async def test_redis_get_all_objects(redis_config): + cached_redis_registry = CachedDiskDistributionRegistry(await kvstore_impl(redis_config)) + await cached_redis_registry.initialize() + + # Create multiple test banks + test_banks = [ + VectorMemoryBank( + identifier=f"test_bank_{i}", + embedding_model="all-MiniLM-L6-v2", + chunk_size_in_tokens=256, + overlap_size_in_tokens=32, + provider_resource_id=f"test_bank_{i}", + provider_id=f"provider_{i}", + ) + for i in range(3) + ] + + # Register all banks + for bank in test_banks: + await cached_redis_registry.register(bank) + + # Test get_all retrieval + all_results = await cached_redis_registry.get_all() + assert len(all_results) == 3 + + # Verify each bank was stored correctly + for original_bank in test_banks: + matching_banks = [ + b for b in all_results if b.identifier == original_bank.identifier + ] + assert len(matching_banks) == 1 + stored_bank = matching_banks[0] + assert stored_bank.embedding_model == original_bank.embedding_model + assert stored_bank.provider_id == original_bank.provider_id + assert stored_bank.chunk_size_in_tokens == original_bank.chunk_size_in_tokens + assert ( + stored_bank.overlap_size_in_tokens == original_bank.overlap_size_in_tokens + ) diff --git a/llama_stack/providers/tests/memory/test_memory.py b/llama_stack/providers/tests/memory/test_memory.py index 03597d073..c99565642 100644 --- a/llama_stack/providers/tests/memory/test_memory.py +++ b/llama_stack/providers/tests/memory/test_memory.py @@ -5,12 +5,14 @@ # the root directory of this source tree. import uuid +from datetime import datetime, timedelta import pytest from llama_stack.apis.memory import * # noqa: F403 from llama_stack.distribution.datatypes import * # noqa: F403 from llama_stack.apis.memory_banks.memory_banks import VectorMemoryBankParams +from llama_stack.providers.utils.kvstore import RedisKVStoreConfig, kvstore_impl # How to run this test: # @@ -45,6 +47,18 @@ def sample_documents(): ] +@pytest.fixture +def redis_config(): + return RedisKVStoreConfig(host="localhost", port=6379) + + +@pytest_asyncio.fixture +async def redis_kvstore(redis_config): + kvstore = await kvstore_impl(redis_config) + await kvstore.initialize() + return kvstore + + async def register_memory_bank( banks_impl: MemoryBanks, inference_model: str ) -> MemoryBank: @@ -177,6 +191,84 @@ class TestMemory: print("The scores are:", response5.scores) assert all(score >= 0.01 for score in response5.scores) + @pytest.mark.asyncio + async def test_redis_kvstore_initialization(self, redis_kvstore): + # Test empty Redis KVStore + result = await redis_kvstore.get("nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_redis_kvstore_crud_operations(self, redis_kvstore): + # Test set and get operations + await redis_kvstore.set("key1", "value1") + result = await redis_kvstore.get("key1") + assert result == "value1" + + # Test delete operation + await redis_kvstore.delete("key1") + result = await redis_kvstore.get("key1") + assert result is None + + @pytest.mark.asyncio + async def test_redis_kvstore_namespaced_keys(self, redis_config): + redis_config.namespace = "test_namespace" + kvstore = await kvstore_impl(redis_config) + await kvstore.initialize() + + # Test namespaced set and get operations + await kvstore.set("key1", "value1") + result = await kvstore.get("key1") + assert result == "value1" + + # Test delete operation with namespaced key + await kvstore.delete("key1") + result = await kvstore.get("key1") + assert result is None + + @pytest.mark.asyncio + async def test_redis_kvstore_expiration_handling(self, redis_kvstore): + # Test set operation with expiration + expiration = datetime.now() + timedelta(seconds=1) + await redis_kvstore.set("key1", "value1", expiration=expiration) + + # Verify key is retrievable before expiration + result = await redis_kvstore.get("key1") + assert result == "value1" + + # Wait for expiration and verify key is no longer retrievable + await asyncio.sleep(2) + result = await redis_kvstore.get("key1") + assert result is None + + @pytest.mark.asyncio + async def test_redis_kvstore_error_handling(self, redis_config): + # Test connection to invalid Redis server + invalid_config = RedisKVStoreConfig(host="invalid_host", port=6379) + with pytest.raises(RuntimeError): + await kvstore_impl(invalid_config) + + @pytest.mark.asyncio + async def test_redis_kvstore_concurrency(self, redis_kvstore): + # Test concurrent access to Redis KVStore + async def set_value(key, value): + await redis_kvstore.set(key, value) + + async def get_value(key): + return await redis_kvstore.get(key) + + tasks = [] + for i in range(10): + tasks.append(set_value(f"key{i}", f"value{i}")) + await asyncio.gather(*tasks) + + tasks = [] + for i in range(10): + tasks.append(get_value(f"key{i}")) + results = await asyncio.gather(*tasks) + + for i in range(10): + assert results[i] == f"value{i}" + def assert_valid_response(response: QueryDocumentsResponse): assert isinstance(response, QueryDocumentsResponse)