mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-07-16 18:08:09 +00:00
fix(faiss): Delete file contents from kvstore (#2686)
Remove both the metadata and content from the kvstore when a file is being removed from the vector store. Closes: #2685 Also add faiss provider to openai_vector_stores test suite --------- Signed-off-by: Derek Higgins <derekh@redhat.com> Co-authored-by: raghotham <rsm@meta.com>
This commit is contained in:
parent
77d2c8e95d
commit
a7ed86181c
3 changed files with 63 additions and 8 deletions
|
@ -267,6 +267,7 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
|
||||||
assert self.kvstore is not None
|
assert self.kvstore is not None
|
||||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||||
|
self.openai_vector_stores[store_id] = store_info
|
||||||
|
|
||||||
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
|
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
|
||||||
"""Load all vector store metadata from kvstore."""
|
"""Load all vector store metadata from kvstore."""
|
||||||
|
@ -286,17 +287,20 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
|
||||||
assert self.kvstore is not None
|
assert self.kvstore is not None
|
||||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||||
|
self.openai_vector_stores[store_id] = store_info
|
||||||
|
|
||||||
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
|
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
|
||||||
"""Delete vector store metadata from kvstore."""
|
"""Delete vector store metadata from kvstore."""
|
||||||
assert self.kvstore is not None
|
assert self.kvstore is not None
|
||||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||||
await self.kvstore.delete(key)
|
await self.kvstore.delete(key)
|
||||||
|
if store_id in self.openai_vector_stores:
|
||||||
|
del self.openai_vector_stores[store_id]
|
||||||
|
|
||||||
async def _save_openai_vector_store_file(
|
async def _save_openai_vector_store_file(
|
||||||
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
|
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Save vector store file metadata to kvstore."""
|
"""Save vector store file data to kvstore."""
|
||||||
assert self.kvstore is not None
|
assert self.kvstore is not None
|
||||||
key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
|
key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
|
||||||
await self.kvstore.set(key=key, value=json.dumps(file_info))
|
await self.kvstore.set(key=key, value=json.dumps(file_info))
|
||||||
|
@ -324,7 +328,16 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
|
||||||
await self.kvstore.set(key=key, value=json.dumps(file_info))
|
await self.kvstore.set(key=key, value=json.dumps(file_info))
|
||||||
|
|
||||||
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
|
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
|
||||||
"""Delete vector store file metadata from kvstore."""
|
"""Delete vector store data from kvstore."""
|
||||||
assert self.kvstore is not None
|
assert self.kvstore is not None
|
||||||
key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
|
|
||||||
await self.kvstore.delete(key)
|
keys_to_delete = [
|
||||||
|
f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}",
|
||||||
|
f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}",
|
||||||
|
]
|
||||||
|
for key in keys_to_delete:
|
||||||
|
try:
|
||||||
|
await self.kvstore.delete(key)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to delete key {key}: {e}")
|
||||||
|
continue
|
||||||
|
|
|
@ -12,6 +12,8 @@ from pymilvus import MilvusClient, connections
|
||||||
|
|
||||||
from llama_stack.apis.vector_dbs import VectorDB
|
from llama_stack.apis.vector_dbs import VectorDB
|
||||||
from llama_stack.apis.vector_io import Chunk, ChunkMetadata
|
from llama_stack.apis.vector_io import Chunk, ChunkMetadata
|
||||||
|
from llama_stack.providers.inline.vector_io.faiss.config import FaissVectorIOConfig
|
||||||
|
from llama_stack.providers.inline.vector_io.faiss.faiss import FaissIndex, FaissVectorIOAdapter
|
||||||
from llama_stack.providers.inline.vector_io.milvus.config import MilvusVectorIOConfig, SqliteKVStoreConfig
|
from llama_stack.providers.inline.vector_io.milvus.config import MilvusVectorIOConfig, SqliteKVStoreConfig
|
||||||
from llama_stack.providers.inline.vector_io.sqlite_vec import SQLiteVectorIOConfig
|
from llama_stack.providers.inline.vector_io.sqlite_vec import SQLiteVectorIOConfig
|
||||||
from llama_stack.providers.inline.vector_io.sqlite_vec.sqlite_vec import SQLiteVecIndex, SQLiteVecVectorIOAdapter
|
from llama_stack.providers.inline.vector_io.sqlite_vec.sqlite_vec import SQLiteVecIndex, SQLiteVecVectorIOAdapter
|
||||||
|
@ -90,7 +92,7 @@ def sample_embeddings_with_metadata(sample_chunks_with_metadata):
|
||||||
return np.array([np.random.rand(EMBEDDING_DIMENSION).astype(np.float32) for _ in sample_chunks_with_metadata])
|
return np.array([np.random.rand(EMBEDDING_DIMENSION).astype(np.float32) for _ in sample_chunks_with_metadata])
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(params=["milvus", "sqlite_vec"])
|
@pytest.fixture(params=["milvus", "sqlite_vec", "faiss"])
|
||||||
def vector_provider(request):
|
def vector_provider(request):
|
||||||
return request.param
|
return request.param
|
||||||
|
|
||||||
|
@ -116,7 +118,7 @@ async def unique_kvstore_config(tmp_path_factory):
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def sqlite_vec_db_path(tmp_path_factory):
|
def sqlite_vec_db_path(tmp_path_factory):
|
||||||
db_path = str(tmp_path_factory.getbasetemp() / "test.db")
|
db_path = str(tmp_path_factory.getbasetemp() / "test_sqlite_vec.db")
|
||||||
return db_path
|
return db_path
|
||||||
|
|
||||||
|
|
||||||
|
@ -198,11 +200,49 @@ async def milvus_vec_adapter(milvus_vec_db_path, mock_inference_api):
|
||||||
await adapter.shutdown()
|
await adapter.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def faiss_vec_db_path(tmp_path_factory):
|
||||||
|
db_path = str(tmp_path_factory.getbasetemp() / "test_faiss.db")
|
||||||
|
return db_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def faiss_vec_index(embedding_dimension):
|
||||||
|
index = FaissIndex(embedding_dimension)
|
||||||
|
yield index
|
||||||
|
await index.delete()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def faiss_vec_adapter(unique_kvstore_config, mock_inference_api, embedding_dimension):
|
||||||
|
config = FaissVectorIOConfig(
|
||||||
|
kvstore=unique_kvstore_config,
|
||||||
|
)
|
||||||
|
adapter = FaissVectorIOAdapter(
|
||||||
|
config=config,
|
||||||
|
inference_api=mock_inference_api,
|
||||||
|
files_api=None,
|
||||||
|
)
|
||||||
|
await adapter.initialize()
|
||||||
|
await adapter.register_vector_db(
|
||||||
|
VectorDB(
|
||||||
|
identifier=f"faiss_test_collection_{np.random.randint(1e6)}",
|
||||||
|
provider_id="test_provider",
|
||||||
|
embedding_model="test_model",
|
||||||
|
embedding_dimension=embedding_dimension,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
yield adapter
|
||||||
|
await adapter.shutdown()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def vector_io_adapter(vector_provider, request):
|
def vector_io_adapter(vector_provider, request):
|
||||||
"""Returns the appropriate vector IO adapter based on the provider parameter."""
|
"""Returns the appropriate vector IO adapter based on the provider parameter."""
|
||||||
if vector_provider == "milvus":
|
if vector_provider == "milvus":
|
||||||
return request.getfixturevalue("milvus_vec_adapter")
|
return request.getfixturevalue("milvus_vec_adapter")
|
||||||
|
elif vector_provider == "faiss":
|
||||||
|
return request.getfixturevalue("faiss_vec_adapter")
|
||||||
else:
|
else:
|
||||||
return request.getfixturevalue("sqlite_vec_adapter")
|
return request.getfixturevalue("sqlite_vec_adapter")
|
||||||
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ async def test_query_unregistered_raises(vector_io_adapter):
|
||||||
|
|
||||||
async def test_insert_chunks_calls_underlying_index(vector_io_adapter):
|
async def test_insert_chunks_calls_underlying_index(vector_io_adapter):
|
||||||
fake_index = AsyncMock()
|
fake_index = AsyncMock()
|
||||||
vector_io_adapter._get_and_cache_vector_db_index = AsyncMock(return_value=fake_index)
|
vector_io_adapter.cache["db1"] = fake_index
|
||||||
|
|
||||||
chunks = ["chunk1", "chunk2"]
|
chunks = ["chunk1", "chunk2"]
|
||||||
await vector_io_adapter.insert_chunks("db1", chunks)
|
await vector_io_adapter.insert_chunks("db1", chunks)
|
||||||
|
@ -112,7 +112,7 @@ async def test_insert_chunks_missing_db_raises(vector_io_adapter):
|
||||||
async def test_query_chunks_calls_underlying_index_and_returns(vector_io_adapter):
|
async def test_query_chunks_calls_underlying_index_and_returns(vector_io_adapter):
|
||||||
expected = QueryChunksResponse(chunks=[Chunk(content="c1")], scores=[0.1])
|
expected = QueryChunksResponse(chunks=[Chunk(content="c1")], scores=[0.1])
|
||||||
fake_index = AsyncMock(query_chunks=AsyncMock(return_value=expected))
|
fake_index = AsyncMock(query_chunks=AsyncMock(return_value=expected))
|
||||||
vector_io_adapter._get_and_cache_vector_db_index = AsyncMock(return_value=fake_index)
|
vector_io_adapter.cache["db1"] = fake_index
|
||||||
|
|
||||||
response = await vector_io_adapter.query_chunks("db1", "my_query", {"param": 1})
|
response = await vector_io_adapter.query_chunks("db1", "my_query", {"param": 1})
|
||||||
|
|
||||||
|
@ -286,5 +286,7 @@ async def test_delete_openai_vector_store_file_from_storage(vector_io_adapter, t
|
||||||
await vector_io_adapter._save_openai_vector_store_file(store_id, file_id, file_info, file_contents)
|
await vector_io_adapter._save_openai_vector_store_file(store_id, file_id, file_info, file_contents)
|
||||||
await vector_io_adapter._delete_openai_vector_store_file_from_storage(store_id, file_id)
|
await vector_io_adapter._delete_openai_vector_store_file_from_storage(store_id, file_id)
|
||||||
|
|
||||||
|
loaded_file_info = await vector_io_adapter._load_openai_vector_store_file(store_id, file_id)
|
||||||
|
assert loaded_file_info == {}
|
||||||
loaded_contents = await vector_io_adapter._load_openai_vector_store_file_contents(store_id, file_id)
|
loaded_contents = await vector_io_adapter._load_openai_vector_store_file_contents(store_id, file_id)
|
||||||
assert loaded_contents == []
|
assert loaded_contents == []
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue