fix: Vector store persistence across server restarts

This commit is contained in:
jperezde 2025-10-31 17:08:48 +01:00
parent b47afac7c2
commit ea463229bb
6 changed files with 72 additions and 31 deletions

View file

@ -421,13 +421,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=SQLiteVecIndex(

View file

@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
self.vector_store_table = self.kvstore
if isinstance(self.config, RemoteChromaVectorIOConfig):
log.info(f"Connecting to Chroma server at: {self.config.url}")
@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
vector_store = VectorStore.model_validate_json(vector_store_data)
collection = await maybe_await(self.client.get_collection(vector_store_id))
if not collection:
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")

View file

@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),

View file

@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
log.exception("Could not connect to PGVector database server")
raise RuntimeError("Could not connect to PGVector database server") from e
# Load existing vector stores from KV store into cache
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
pgvector_index = PGVectorIndex(
vector_store=vector_store,
dimension=vector_store.embedding_dimension,
conn=self.conn,
kvstore=self.kvstore,
)
await pgvector_index.initialize()
index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_store.identifier] = index
async def shutdown(self) -> None:
if self.conn is not None:
self.conn.close()
@ -377,7 +393,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
async def register_vector_store(self, vector_store: VectorStore) -> None:
# Persist vector DB metadata in the KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
# Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
@ -396,7 +413,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
del self.cache[vector_store_id]
# Delete vector DB metadata from KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
@ -413,13 +431,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
await index.initialize()
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)

View file

@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await super().shutdown()
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_store_id}")
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),

View file

@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
if not client.collections.exists(sanitized_collection_name):