feat: Adding support for get, update, delete for Vector Stores API

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Javier Arceo 2025-08-01 17:03:43 -04:00
parent 0527c0fb15
commit 4c0eb47fc7
8 changed files with 1818 additions and 1 deletions

View file

@ -15,14 +15,17 @@ from typing import Any
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files, OpenAIFileObject
from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
Chunk,
QueryChunksResponse,
SearchRankingOptions,
VectorStoreChunkDeleteResponse,
VectorStoreChunkingStrategy,
VectorStoreChunkingStrategyAuto,
VectorStoreChunkingStrategyStatic,
VectorStoreChunkObject,
VectorStoreContent,
VectorStoreDeleteResponse,
VectorStoreFileContentsResponse,
@ -31,6 +34,7 @@ from llama_stack.apis.vector_io import (
VectorStoreFileLastError,
VectorStoreFileObject,
VectorStoreFileStatus,
VectorStoreListChunksResponse,
VectorStoreListFilesResponse,
VectorStoreListResponse,
VectorStoreObject,
@ -109,7 +113,14 @@ class OpenAIVectorStoreMixin(ABC):
assert self.kvstore
meta_key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
await self.kvstore.set(key=meta_key, value=json.dumps(file_info))
# delete old file data to properly update content
contents_prefix = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}:"
end_key = f"{contents_prefix}\xff"
old_keys = await self.kvstore.keys_in_range(contents_prefix, end_key)
for old_key in old_keys:
await self.kvstore.delete(old_key)
for idx, chunk in enumerate(file_contents):
await self.kvstore.set(key=f"{contents_prefix}{idx}", value=json.dumps(chunk))
@ -791,3 +802,233 @@ class OpenAIVectorStoreMixin(ABC):
id=file_id,
deleted=True,
)
async def openai_retrieve_vector_store_chunk(
self,
vector_store_id: str,
file_id: str,
chunk_id: str,
) -> VectorStoreChunkObject:
"""Retrieve a specific chunk from a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
raise ValueError(f"File {file_id} not found in vector store {vector_store_id}")
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
chunks = [Chunk.model_validate(c) for c in dict_chunks]
target_chunk = None
for chunk in chunks:
if chunk.chunk_id == chunk_id:
target_chunk = chunk
break
if target_chunk is None:
raise ValueError(f"Chunk {chunk_id} not found in file {file_id}")
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
return VectorStoreChunkObject(
id=chunk_id,
created_at=file_info.get("created_at", int(time.time())),
vector_store_id=vector_store_id,
file_id=file_id,
content=target_chunk.content,
metadata=target_chunk.metadata,
embedding=target_chunk.embedding,
)
async def openai_update_vector_store_chunk(
self,
vector_store_id: str,
file_id: str,
chunk_id: str,
content: InterleavedContent | None = None,
metadata: dict[str, Any] | None = None,
) -> VectorStoreChunkObject:
"""Update a specific chunk in a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
raise ValueError(f"File {file_id} not found in vector store {vector_store_id}")
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
chunks = [Chunk.model_validate(c) for c in dict_chunks]
target_chunk_index = None
for i, chunk in enumerate(chunks):
if chunk.chunk_id == chunk_id:
target_chunk_index = i
break
if target_chunk_index is None:
raise ValueError(f"Chunk {chunk_id} not found in file {file_id}")
# updating content
target_chunk = chunks[target_chunk_index]
if content is not None:
target_chunk.content = content
# delete old chunk and update
await self.delete_chunks(vector_store_id, [chunk_id])
await self.insert_chunks(vector_store_id, [target_chunk])
if metadata is not None:
target_chunk.metadata.update(metadata)
chunks[target_chunk_index] = target_chunk
dict_chunks = [c.model_dump() for c in chunks]
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks)
return VectorStoreChunkObject(
id=chunk_id,
created_at=file_info.get("created_at", int(time.time())),
vector_store_id=vector_store_id,
file_id=file_id,
content=target_chunk.content,
metadata=target_chunk.metadata,
embedding=target_chunk.embedding,
)
async def openai_delete_vector_store_chunk(
self,
vector_store_id: str,
file_id: str,
chunk_id: str,
) -> VectorStoreChunkDeleteResponse:
"""Delete a specific chunk from a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
raise ValueError(f"File {file_id} not found in vector store {vector_store_id}")
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
chunks = [Chunk.model_validate(c) for c in dict_chunks]
target_chunk_index = None
for i, chunk in enumerate(chunks):
if chunk.chunk_id == chunk_id:
target_chunk_index = i
break
if target_chunk_index is None:
raise ValueError(f"Chunk {chunk_id} not found in file {file_id}")
await self.delete_chunks(vector_store_id, [chunk_id])
dict_chunks.pop(target_chunk_index)
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks)
return VectorStoreChunkDeleteResponse(
id=chunk_id,
deleted=True,
)
async def openai_list_vector_store_chunks(
self,
vector_store_id: str,
file_id: str,
limit: int | None = 20,
order: str | None = "desc",
after: str | None = None,
before: str | None = None,
) -> VectorStoreListChunksResponse:
"""List chunks in a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise VectorStoreNotFoundError(vector_store_id)
store_info = self.openai_vector_stores[vector_store_id]
if file_id not in store_info["file_ids"]:
raise ValueError(f"File {file_id} not found in vector store {vector_store_id}")
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
chunk_ids = []
for dict_chunk in dict_chunks:
chunk = Chunk.model_validate(dict_chunk)
if chunk.chunk_id:
chunk_ids.append(chunk.chunk_id)
# TODO: Add abstract method query_all_chunks() to properly filter by file_id and vector_db_id, this is a temporary hack
chunks = []
if chunk_ids:
try:
file_filter = {"type": "eq", "key": "file_id", "value": file_id}
query_result = await self.query_chunks(
vector_db_id=vector_store_id,
query="*", # wildcard query to get all chunks
params={
"max_chunks": len(chunk_ids) * 2,
"score_threshold": 0.0,
"filters": file_filter,
},
)
chunk_id_set = set(chunk_ids)
chunks = [chunk for chunk in query_result.chunks if chunk.chunk_id in chunk_id_set]
except Exception as e:
logger.warning(f"Failed to query vector database for chunks: {e}")
# Fallback to KV store chunks if vector DB query fails
chunks = [Chunk.model_validate(c) for c in dict_chunks]
chunk_objects = []
for chunk in chunks:
chunk_obj = VectorStoreChunkObject(
id=chunk.chunk_id,
created_at=file_info.get("created_at", int(time.time())),
vector_store_id=vector_store_id,
file_id=file_id,
content=chunk.content,
metadata=chunk.metadata,
embedding=chunk.embedding,
)
chunk_objects.append(chunk_obj)
if order == "desc":
chunk_objects.sort(key=lambda x: x.created_at, reverse=True)
else:
chunk_objects.sort(key=lambda x: x.created_at)
start_idx = 0
end_idx = len(chunk_objects)
if after:
# find index after 'after' chunk
for i, chunk_obj in enumerate(chunk_objects):
if chunk_obj.id == after:
start_idx = i + 1
break
if before:
# find index before 'before' chunk
for i, chunk_obj in enumerate(chunk_objects):
if chunk_obj.id == before:
end_idx = i
break
if limit:
if end_idx - start_idx > limit:
end_idx = start_idx + limit
paginated_chunks = chunk_objects[start_idx:end_idx]
first_id = paginated_chunks[0].id if paginated_chunks else None
last_id = paginated_chunks[-1].id if paginated_chunks else None
has_more = end_idx < len(chunk_objects)
return VectorStoreListChunksResponse(
data=paginated_chunks,
first_id=first_id,
last_id=last_id,
has_more=has_more,
)