Merge branch 'main' into feat/add-dana-agent-provider-stub

This commit is contained in:
Zooey Nguyen 2025-11-09 16:07:04 -08:00 committed by GitHub
commit 9641f3ea7b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 203 additions and 33 deletions

View file

@ -223,7 +223,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
@ -239,7 +240,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
return [i.vector_store for i in self.cache.values()]
async def unregister_vector_store(self, vector_store_id: str) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
if vector_store_id not in self.cache:
return
@ -248,6 +250,27 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoco
del self.cache[vector_store_id]
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_store_id] = index
return index
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = self.cache.get(vector_store_id)
if index is None:

View file

@ -412,6 +412,14 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
return [v.vector_store for v in self.cache.values()]
async def register_vector_store(self, vector_store: VectorStore) -> None:
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
# Save to kvstore for persistence
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Create and cache the index
index = await SQLiteVecIndex.create(
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
)
@ -421,13 +429,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresPro
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=SQLiteVecIndex(

View file

@ -131,7 +131,6 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
self.vector_store_table = self.kvstore
if isinstance(self.config, RemoteChromaVectorIOConfig):
log.info(f"Connecting to Chroma server at: {self.config.url}")
@ -190,9 +189,16 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
vector_store = VectorStore.model_validate_json(vector_store_data)
collection = await maybe_await(self.client.get_collection(vector_store_id))
if not collection:
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")

View file

@ -328,13 +328,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),

View file

@ -368,6 +368,22 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
log.exception("Could not connect to PGVector database server")
raise RuntimeError("Could not connect to PGVector database server") from e
# Load existing vector stores from KV store into cache
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
pgvector_index = PGVectorIndex(
vector_store=vector_store,
dimension=vector_store.embedding_dimension,
conn=self.conn,
kvstore=self.kvstore,
)
await pgvector_index.initialize()
index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_store.identifier] = index
async def shutdown(self) -> None:
if self.conn is not None:
self.conn.close()
@ -377,7 +393,13 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
async def register_vector_store(self, vector_store: VectorStore) -> None:
# Persist vector DB metadata in the KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
# Save to kvstore for persistence
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
@ -396,7 +418,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
del self.cache[vector_store_id]
# Delete vector DB metadata from KV store
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before unregistering vector stores.")
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
@ -413,13 +436,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProt
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
await index.initialize()
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)

View file

@ -183,7 +183,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await super().shutdown()
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before registering vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
@ -200,20 +201,24 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtoc
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
assert self.kvstore is not None
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_store_id}")
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),

View file

@ -346,13 +346,16 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
# Try to load from kvstore
if self.kvstore is None:
raise RuntimeError("KVStore not initialized. Call initialize() before using vector stores.")
key = f"{VECTOR_DBS_PREFIX}{vector_store_id}"
vector_store_data = await self.kvstore.get(key)
if not vector_store_data:
raise VectorStoreNotFoundError(vector_store_id)
vector_store = VectorStore.model_validate_json(vector_store_data)
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
if not client.collections.exists(sanitized_collection_name):

View file

@ -92,6 +92,99 @@ async def test_persistence_across_adapter_restarts(vector_io_adapter):
await vector_io_adapter.shutdown()
async def test_vector_store_lazy_loading_from_kvstore(vector_io_adapter):
"""
Test that vector stores can be lazy-loaded from KV store when not in cache.
Verifies that clearing the cache doesn't break vector store access - they
can be loaded on-demand from persistent storage.
"""
await vector_io_adapter.initialize()
vector_store_id = f"lazy_load_test_{np.random.randint(1e6)}"
vector_store = VectorStore(
identifier=vector_store_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
assert vector_store_id in vector_io_adapter.cache
vector_io_adapter.cache.clear()
assert vector_store_id not in vector_io_adapter.cache
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
assert loaded_index is not None
assert loaded_index.vector_store.identifier == vector_store_id
assert vector_store_id in vector_io_adapter.cache
cached_index = await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
assert cached_index is loaded_index
await vector_io_adapter.shutdown()
async def test_vector_store_preloading_on_initialization(vector_io_adapter):
"""
Test that vector stores are preloaded from KV store during initialization.
Verifies that after restart, all vector stores are automatically loaded into
cache and immediately accessible without requiring lazy loading.
"""
await vector_io_adapter.initialize()
vector_store_ids = [f"preload_test_{i}_{np.random.randint(1e6)}" for i in range(3)]
for vs_id in vector_store_ids:
vector_store = VectorStore(
identifier=vs_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
for vs_id in vector_store_ids:
assert vs_id in vector_io_adapter.cache
await vector_io_adapter.shutdown()
await vector_io_adapter.initialize()
for vs_id in vector_store_ids:
assert vs_id in vector_io_adapter.cache
for vs_id in vector_store_ids:
loaded_index = await vector_io_adapter._get_and_cache_vector_store_index(vs_id)
assert loaded_index is not None
assert loaded_index.vector_store.identifier == vs_id
await vector_io_adapter.shutdown()
async def test_kvstore_none_raises_runtime_error(vector_io_adapter):
"""
Test that accessing vector stores with uninitialized kvstore raises RuntimeError.
Verifies proper RuntimeError is raised instead of assertions when kvstore is None.
"""
await vector_io_adapter.initialize()
vector_store_id = f"kvstore_none_test_{np.random.randint(1e6)}"
vector_store = VectorStore(
identifier=vector_store_id,
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=128,
)
await vector_io_adapter.register_vector_store(vector_store)
vector_io_adapter.cache.clear()
vector_io_adapter.kvstore = None
with pytest.raises(RuntimeError, match="KVStore not initialized"):
await vector_io_adapter._get_and_cache_vector_store_index(vector_store_id)
async def test_register_and_unregister_vector_store(vector_io_adapter):
unique_id = f"foo_db_{np.random.randint(1e6)}"
dummy = VectorStore(