From 35b9da04e5f124b5873bd5431fba20ab21ff5fb9 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Tue, 12 Aug 2025 14:25:29 -0700 Subject: [PATCH] fix sqlite vec, weaviate, milvus --- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 7 ++++--- .../providers/remote/vector_io/milvus/milvus.py | 2 +- .../providers/remote/vector_io/weaviate/weaviate.py | 10 ++++++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index dec498cc3..cc1982f3b 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -438,13 +438,14 @@ class SQLiteVecIndex(EmbeddingIndex): cur.execute("BEGIN TRANSACTION") # Delete from metadata table - cur.execute(f"DELETE FROM {self.metadata_table} WHERE id = ANY(?)", (chunk_ids,)) + placeholders = ",".join("?" * len(chunk_ids)) + cur.execute(f"DELETE FROM {self.metadata_table} WHERE id IN ({placeholders})", chunk_ids) # Delete from vector table - cur.execute(f"DELETE FROM {self.vector_table} WHERE id = ANY(?)", (chunk_ids,)) + cur.execute(f"DELETE FROM {self.vector_table} WHERE id IN ({placeholders})", chunk_ids) # Delete from FTS table - cur.execute(f"DELETE FROM {self.fts_table} WHERE id = ANY(?)", (chunk_ids,)) + cur.execute(f"DELETE FROM {self.fts_table} WHERE id IN ({placeholders})", chunk_ids) connection.commit() except Exception as e: diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index 815a103fc..e681e5a99 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -293,7 +293,7 @@ class MilvusIndex(EmbeddingIndex): chunk_ids_str = ",".join(f"'{c.chunk_id}'" for c in chunks_for_deletion) try: await asyncio.to_thread( - self.client.delete, collection_name=self.collection_name, filter=f"chunk_id IN [{chunk_ids_str}]" + self.client.delete, collection_name=self.collection_name, filter=f"chunk_id in ({chunk_ids_str})" ) except Exception as e: logger.error(f"Error deleting chunks from Milvus collection {self.collection_name}: {e}") diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index 11da8902c..c1aa77a62 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -26,6 +26,7 @@ from llama_stack.providers.utils.memory.openai_vector_store_mixin import ( OpenAIVectorStoreMixin, ) from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -79,10 +80,11 @@ class WeaviateIndex(EmbeddingIndex): # TODO: make this async friendly collection.data.insert_many(data_objects) - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True) collection = self.client.collections.get(sanitized_collection_name) - collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id])) + chunk_ids = [chunk.chunk_id for chunk in chunks_for_deletion] + collection.data.delete_many(where=Filter.by_property("id").contains_any(chunk_ids)) async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True) @@ -307,10 +309,10 @@ class WeaviateVectorIOAdapter( return await index.query_chunks(query, params) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True) index = await self._get_and_cache_vector_db_index(sanitized_collection_name) if not index: raise ValueError(f"Vector DB {sanitized_collection_name} not found") - await index.delete(chunk_ids) + await index.index.delete_chunks(chunks_for_deletion)