diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index 5e33d4ca3..e0742bb15 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -223,7 +223,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}") async def register_vector_store(self, vector_store: VectorStore) -> None: - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" await self.kvstore.set(key=key, value=vector_store.model_dump_json()) @@ -239,7 +240,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco return [i.vector_store for i in self.cache.values()] async def unregister_vector_store(self, vector_store_id: str) -> None: - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.") if vector_store_id not in self.cache: return @@ -248,6 +250,27 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco del self.cache[vector_store_id] await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") + async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: + if vector_store_id in self.cache: + return self.cache[vector_store_id] + + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: + raise VectorStoreNotFoundError(vector_store_id) + + vector_store = VectorStore.model_validate_json(vector_store_data) + index = VectorStoreWithIndex( + vector_store=vector_store, + index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier), + inference_api=self.inference_api, + ) + self.cache[vector_store_id] = index + return index + async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: index = self.cache.get(vector_db_id) if index is None: diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 37294f173..ac645925d 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -412,6 +412,14 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro return [v.vector_store for v in self.cache.values()] async def register_vector_store(self, vector_store: VectorStore) -> None: + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") + + # Save to kvstore for persistence + key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" + await self.kvstore.set(key=key, value=vector_store.model_dump_json()) + + # Create and cache the index index = await SQLiteVecIndex.create( vector_store.embedding_dimension, self.config.db_path, vector_store.identifier ) @@ -421,13 +429,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=SQLiteVecIndex( diff --git a/llama_stack/providers/remote/vector_io/chroma/chroma.py b/llama_stack/providers/remote/vector_io/chroma/chroma.py index 2663ad43e..2a4df92f5 100644 --- a/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc async def initialize(self) -> None: self.kvstore = await kvstore_impl(self.config.persistence) - self.vector_store_table = self.kvstore if isinstance(self.config, RemoteChromaVectorIOConfig): log.info(f"Connecting to Chroma server at: {self.config.url}") @@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc if vector_store_id in self.cache: return self.cache[vector_store_id] - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack") + + vector_store = VectorStore.model_validate_json(vector_store_data) collection = await maybe_await(self.client.get_collection(vector_store_id)) if not collection: raise ValueError(f"Vector DB {vector_store_id} not found in Chroma") diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index cccf13816..568aea559 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore), diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index f28bd3cd9..ad14b829b 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt log.exception("Could not connect to PGVector database server") raise RuntimeError("Could not connect to PGVector database server") from e + # Load existing vector stores from KV store into cache + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key) + for vector_store_data in stored_vector_stores: + vector_store = VectorStore.model_validate_json(vector_store_data) + pgvector_index = PGVectorIndex( + vector_store=vector_store, + dimension=vector_store.embedding_dimension, + conn=self.conn, + kvstore=self.kvstore, + ) + await pgvector_index.initialize() + index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api) + self.cache[vector_store.identifier] = index + async def shutdown(self) -> None: if self.conn is not None: self.conn.close() @@ -377,7 +393,13 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt async def register_vector_store(self, vector_store: VectorStore) -> None: # Persist vector DB metadata in the KV store - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") + + # Save to kvstore for persistence + key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" + await self.kvstore.set(key=key, value=vector_store.model_dump_json()) + # Upsert model metadata in Postgres upsert_models(self.conn, [(vector_store.identifier, vector_store)]) @@ -396,7 +418,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt del self.cache[vector_store_id] # Delete vector DB metadata from KV store - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.") await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}") async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: @@ -413,13 +436,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn) await index.initialize() self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api) diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 93d0894a6..8f6f1fcaf 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc await super().shutdown() async def register_vector_store(self, vector_store: VectorStore) -> None: - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" await self.kvstore.set(key=key, value=vector_store.model_dump_json()) @@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc await self.cache[vector_store_id].index.delete() del self.cache[vector_store_id] - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise ValueError(f"Vector DB not found {vector_store_id}") + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=QdrantIndex(client=self.client, collection_name=vector_store.identifier), diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index 66922aa3f..74e64b743 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) client = self._get_client() sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True) if not client.collections.exists(sanitized_collection_name): diff --git a/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py b/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py index 9d9c767f6..d6c96f1fa 100644 --- a/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py +++ b/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py @@ -92,6 +92,99 @@ async def test_persistence_across_adapter_restarts(vector_io_adapter): await vector_io_adapter.shutdown() +async def test_vector_store_lazy_loading_from_kvstore(vector_io_adapter): + """ + Test that vector stores can be lazy-loaded from KV store when not in cache. + + Verifies that clearing the cache doesn't break vector store access - they + can be loaded on-demand from persistent storage. + """ + await vector_io_adapter.initialize() + + vector_store_id = f"lazy_load_test_{np.random.randint(1e6)}" + vector_store = VectorStore( + identifier=vector_store_id, + provider_id="test_provider", + embedding_model="test_model", + embedding_dimension=128, + ) + await vector_io_adapter.register_vector_store(vector_store) + assert vector_store_id in vector_io_adapter.cache + + vector_io_adapter.cache.clear() + assert vector_store_id not in vector_io_adapter.cache + + loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id) + assert loaded_index is not None + assert loaded_index.vector_store.identifier == vector_store_id + assert vector_store_id in vector_io_adapter.cache + + cached_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id) + assert cached_index is loaded_index + + await vector_io_adapter.shutdown() + + +async def test_vector_store_preloading_on_initialization(vector_io_adapter): + """ + Test that vector stores are preloaded from KV store during initialization. + + Verifies that after restart, all vector stores are automatically loaded into + cache and immediately accessible without requiring lazy loading. + """ + await vector_io_adapter.initialize() + + vector_store_ids = [f"preload_test_{i}_{np.random.randint(1e6)}" for i in range(3)] + for vs_id in vector_store_ids: + vector_store = VectorStore( + identifier=vs_id, + provider_id="test_provider", + embedding_model="test_model", + embedding_dimension=128, + ) + await vector_io_adapter.register_vector_store(vector_store) + + for vs_id in vector_store_ids: + assert vs_id in vector_io_adapter.cache + + await vector_io_adapter.shutdown() + await vector_io_adapter.initialize() + + for vs_id in vector_store_ids: + assert vs_id in vector_io_adapter.cache + + for vs_id in vector_store_ids: + loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vs_id) + assert loaded_index is not None + assert loaded_index.vector_store.identifier == vs_id + + await vector_io_adapter.shutdown() + + +async def test_kvstore_none_raises_runtime_error(vector_io_adapter): + """ + Test that accessing vector stores with uninitialized kvstore raises RuntimeError. + + Verifies proper RuntimeError is raised instead of assertions when kvstore is None. + """ + await vector_io_adapter.initialize() + + vector_store_id = f"kvstore_none_test_{np.random.randint(1e6)}" + vector_store = VectorStore( + identifier=vector_store_id, + provider_id="test_provider", + embedding_model="test_model", + embedding_dimension=128, + ) + await vector_io_adapter.register_vector_store(vector_store) + + vector_io_adapter.cache.clear() + vector_io_adapter.kvstore = None + + with pytest.raises(RuntimeError, match="KVStore not initialized"): + await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id) + + async def test_register_and_unregister_vector_store(vector_io_adapter): unique_id = f"foo_db_{np.random.randint(1e6)}" dummy = VectorStore(