mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-15 14:08:00 +00:00
fix sqlite vec, weaviate, milvus
This commit is contained in:
parent
88cfab2768
commit
35b9da04e5
3 changed files with 11 additions and 8 deletions
|
@ -438,13 +438,14 @@ class SQLiteVecIndex(EmbeddingIndex):
|
||||||
cur.execute("BEGIN TRANSACTION")
|
cur.execute("BEGIN TRANSACTION")
|
||||||
|
|
||||||
# Delete from metadata table
|
# Delete from metadata table
|
||||||
cur.execute(f"DELETE FROM {self.metadata_table} WHERE id = ANY(?)", (chunk_ids,))
|
placeholders = ",".join("?" * len(chunk_ids))
|
||||||
|
cur.execute(f"DELETE FROM {self.metadata_table} WHERE id IN ({placeholders})", chunk_ids)
|
||||||
|
|
||||||
# Delete from vector table
|
# Delete from vector table
|
||||||
cur.execute(f"DELETE FROM {self.vector_table} WHERE id = ANY(?)", (chunk_ids,))
|
cur.execute(f"DELETE FROM {self.vector_table} WHERE id IN ({placeholders})", chunk_ids)
|
||||||
|
|
||||||
# Delete from FTS table
|
# Delete from FTS table
|
||||||
cur.execute(f"DELETE FROM {self.fts_table} WHERE id = ANY(?)", (chunk_ids,))
|
cur.execute(f"DELETE FROM {self.fts_table} WHERE id IN ({placeholders})", chunk_ids)
|
||||||
|
|
||||||
connection.commit()
|
connection.commit()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -293,7 +293,7 @@ class MilvusIndex(EmbeddingIndex):
|
||||||
chunk_ids_str = ",".join(f"'{c.chunk_id}'" for c in chunks_for_deletion)
|
chunk_ids_str = ",".join(f"'{c.chunk_id}'" for c in chunks_for_deletion)
|
||||||
try:
|
try:
|
||||||
await asyncio.to_thread(
|
await asyncio.to_thread(
|
||||||
self.client.delete, collection_name=self.collection_name, filter=f"chunk_id IN [{chunk_ids_str}]"
|
self.client.delete, collection_name=self.collection_name, filter=f"chunk_id in ({chunk_ids_str})"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error deleting chunks from Milvus collection {self.collection_name}: {e}")
|
logger.error(f"Error deleting chunks from Milvus collection {self.collection_name}: {e}")
|
||||||
|
|
|
@ -26,6 +26,7 @@ from llama_stack.providers.utils.memory.openai_vector_store_mixin import (
|
||||||
OpenAIVectorStoreMixin,
|
OpenAIVectorStoreMixin,
|
||||||
)
|
)
|
||||||
from llama_stack.providers.utils.memory.vector_store import (
|
from llama_stack.providers.utils.memory.vector_store import (
|
||||||
|
ChunkForDeletion,
|
||||||
EmbeddingIndex,
|
EmbeddingIndex,
|
||||||
VectorDBWithIndex,
|
VectorDBWithIndex,
|
||||||
)
|
)
|
||||||
|
@ -79,10 +80,11 @@ class WeaviateIndex(EmbeddingIndex):
|
||||||
# TODO: make this async friendly
|
# TODO: make this async friendly
|
||||||
collection.data.insert_many(data_objects)
|
collection.data.insert_many(data_objects)
|
||||||
|
|
||||||
async def delete_chunk(self, chunk_id: str) -> None:
|
async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None:
|
||||||
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
||||||
collection = self.client.collections.get(sanitized_collection_name)
|
collection = self.client.collections.get(sanitized_collection_name)
|
||||||
collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id]))
|
chunk_ids = [chunk.chunk_id for chunk in chunks_for_deletion]
|
||||||
|
collection.data.delete_many(where=Filter.by_property("id").contains_any(chunk_ids))
|
||||||
|
|
||||||
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
|
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
|
||||||
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
||||||
|
@ -307,10 +309,10 @@ class WeaviateVectorIOAdapter(
|
||||||
|
|
||||||
return await index.query_chunks(query, params)
|
return await index.query_chunks(query, params)
|
||||||
|
|
||||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
|
||||||
sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True)
|
sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True)
|
||||||
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
|
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
|
||||||
if not index:
|
if not index:
|
||||||
raise ValueError(f"Vector DB {sanitized_collection_name} not found")
|
raise ValueError(f"Vector DB {sanitized_collection_name} not found")
|
||||||
|
|
||||||
await index.delete(chunk_ids)
|
await index.index.delete_chunks(chunks_for_deletion)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue