From ea463229bb8e11954528c68f106c7c5619cb664b Mon Sep 17 00:00:00 2001 From: jperezde Date: Fri, 31 Oct 2025 17:08:48 +0100 Subject: [PATCH] fix: Vector store persistence across server restarts --- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 13 ++++--- .../remote/vector_io/chroma/chroma.py | 12 +++++-- .../remote/vector_io/milvus/milvus.py | 13 ++++--- .../remote/vector_io/pgvector/pgvector.py | 35 +++++++++++++++---- .../remote/vector_io/qdrant/qdrant.py | 17 +++++---- .../remote/vector_io/weaviate/weaviate.py | 13 ++++--- 6 files changed, 72 insertions(+), 31 deletions(-) diff --git a/src/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/src/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index accf5cead..3ae8793b3 100644 --- a/src/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/src/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -421,13 +421,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=SQLiteVecIndex( diff --git a/src/llama_stack/providers/remote/vector_io/chroma/chroma.py b/src/llama_stack/providers/remote/vector_io/chroma/chroma.py index a4fd15f77..97e2244b8 100644 --- a/src/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/src/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc async def initialize(self) -> None: self.kvstore = await kvstore_impl(self.config.persistence) - self.vector_store_table = self.kvstore if isinstance(self.config, RemoteChromaVectorIOConfig): log.info(f"Connecting to Chroma server at: {self.config.url}") @@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc if vector_store_id in self.cache: return self.cache[vector_store_id] - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack") + + vector_store = VectorStore.model_validate_json(vector_store_data) collection = await maybe_await(self.client.get_collection(vector_store_id)) if not collection: raise ValueError(f"Vector DB {vector_store_id} not found in Chroma") diff --git a/src/llama_stack/providers/remote/vector_io/milvus/milvus.py b/src/llama_stack/providers/remote/vector_io/milvus/milvus.py index ace9ab1c4..73339b5be 100644 --- a/src/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/src/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore), diff --git a/src/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/src/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index 29cfd673f..b0786db02 100644 --- a/src/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/src/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt log.exception("Could not connect to PGVector database server") raise RuntimeError("Could not connect to PGVector database server") from e + # Load existing vector stores from KV store into cache + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key) + for vector_store_data in stored_vector_stores: + vector_store = VectorStore.model_validate_json(vector_store_data) + pgvector_index = PGVectorIndex( + vector_store=vector_store, + dimension=vector_store.embedding_dimension, + conn=self.conn, + kvstore=self.kvstore, + ) + await pgvector_index.initialize() + index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api) + self.cache[vector_store.identifier] = index + async def shutdown(self) -> None: if self.conn is not None: self.conn.close() @@ -377,7 +393,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt async def register_vector_store(self, vector_store: VectorStore) -> None: # Persist vector DB metadata in the KV store - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") # Upsert model metadata in Postgres upsert_models(self.conn, [(vector_store.identifier, vector_store)]) @@ -396,7 +413,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt del self.cache[vector_store_id] # Delete vector DB metadata from KV store - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.") await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}") async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: @@ -413,13 +431,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn) await index.initialize() self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api) diff --git a/src/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/src/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 266e9bf58..7d17c5591 100644 --- a/src/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/src/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc await super().shutdown() async def register_vector_store(self, vector_store: VectorStore) -> None: - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.") key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" await self.kvstore.set(key=key, value=vector_store.model_dump_json()) @@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc await self.cache[vector_store_id].index.delete() del self.cache[vector_store_id] - assert self.kvstore is not None + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise ValueError(f"Vector DB not found {vector_store_id}") + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) index = VectorStoreWithIndex( vector_store=vector_store, index=QdrantIndex(client=self.client, collection_name=vector_store.identifier), diff --git a/src/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/src/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index 7813f6e5c..d200662da 100644 --- a/src/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/src/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv if vector_store_id in self.cache: return self.cache[vector_store_id] - if self.vector_store_table is None: - raise VectorStoreNotFoundError(vector_store_id) - - vector_store = await self.vector_store_table.get_vector_store(vector_store_id) - if not vector_store: + # Try to load from kvstore + if self.kvstore is None: + raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.") + + key = f"{VECTOR_DBS_PREFIX}{vector_store_id}" + vector_store_data = await self.kvstore.get(key) + if not vector_store_data: raise VectorStoreNotFoundError(vector_store_id) + vector_store = VectorStore.model_validate_json(vector_store_data) client = self._get_client() sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True) if not client.collections.exists(sanitized_collection_name):