Implement retrieving vector store file contents

This requires some more bookkeeping data, some additional storage (of
the chunks we created for this file), and is implemented for faiss and sqlite-vec.

Signed-off-by: Ben Browning <bbrownin@redhat.com>
This commit is contained in:
Ben Browning 2025-06-18 12:25:27 -04:00
parent a2f0f608db
commit 65869d22a4
11 changed files with 372 additions and 5 deletions

View file

@ -46,6 +46,7 @@ VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
FAISS_INDEX_PREFIX = f"faiss_index:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::"
class FaissIndex(EmbeddingIndex):
@ -285,11 +286,15 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
async def _save_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
"""Save vector store file metadata to kvstore."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
await self.kvstore.set(key=key, value=json.dumps(file_info))
content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}"
await self.kvstore.set(key=content_key, value=json.dumps(file_contents))
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
"""Load vector store file metadata from kvstore."""
@ -298,6 +303,13 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
stored_data = await self.kvstore.get(key)
return json.loads(stored_data) if stored_data else {}
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
"""Load vector store file contents from kvstore."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}"
stored_data = await self.kvstore.get(key)
return json.loads(stored_data) if stored_data else []
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
"""Update vector store file metadata in kvstore."""
assert self.kvstore is not None

View file

@ -466,7 +466,16 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
CREATE TABLE IF NOT EXISTS openai_vector_store_files (
store_id TEXT,
file_id TEXT,
metadata TEXT
metadata TEXT,
PRIMARY KEY (store_id, file_id)
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS openai_vector_store_files_contents (
store_id TEXT,
file_id TEXT,
contents TEXT,
PRIMARY KEY (store_id, file_id)
);
""")
connection.commit()
@ -623,7 +632,9 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
await asyncio.to_thread(_delete)
async def _save_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
"""Save vector store file metadata to SQLite database."""
def _store():
@ -634,6 +645,10 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
"INSERT OR REPLACE INTO openai_vector_store_files (store_id, file_id, metadata) VALUES (?, ?, ?)",
(store_id, file_id, json.dumps(file_info)),
)
cur.execute(
"INSERT OR REPLACE INTO openai_vector_store_files_contents (store_id, file_id, contents) VALUES (?, ?, ?)",
(store_id, file_id, json.dumps(file_contents)),
)
connection.commit()
except Exception as e:
logger.error(f"Error saving openai vector store file {store_id} {file_id}: {e}")
@ -671,6 +686,29 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
stored_data = await asyncio.to_thread(_load)
return json.loads(stored_data) if stored_data else {}
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
"""Load vector store file contents from SQLite database."""
def _load():
connection = _create_sqlite_connection(self.config.db_path)
cur = connection.cursor()
try:
cur.execute(
"SELECT contents FROM openai_vector_store_files_contents WHERE store_id = ? AND file_id = ?",
(store_id, file_id),
)
row = cur.fetchone()
if row is None:
return None
(contents,) = row
return contents
finally:
cur.close()
connection.close()
stored_contents = await asyncio.to_thread(_load)
return json.loads(stored_contents) if stored_contents else []
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
"""Update vector store file metadata in SQLite database."""

View file

@ -26,6 +26,7 @@ from llama_stack.apis.vector_io import (
)
from llama_stack.apis.vector_io.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreFileContentsResponse,
VectorStoreFileObject,
VectorStoreListFilesResponse,
)
@ -281,6 +282,13 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
async def openai_update_vector_store_file(
self,
vector_store_id: str,

View file

@ -28,6 +28,7 @@ from llama_stack.apis.vector_io import (
)
from llama_stack.apis.vector_io.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreFileContentsResponse,
VectorStoreFileObject,
VectorStoreListFilesResponse,
)
@ -279,6 +280,13 @@ class MilvusVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Milvus")
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Milvus")
async def openai_update_vector_store_file(
self,
vector_store_id: str,

View file

@ -26,6 +26,7 @@ from llama_stack.apis.vector_io import (
)
from llama_stack.apis.vector_io.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreFileContentsResponse,
VectorStoreFileObject,
VectorStoreListFilesResponse,
)
@ -281,6 +282,13 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
async def openai_update_vector_store_file(
self,
vector_store_id: str,

View file

@ -12,7 +12,9 @@ import uuid
from abc import ABC, abstractmethod
from typing import Any
from llama_stack.apis.common.content_types import InterleavedContentItem, TextContentItem
from llama_stack.apis.files import Files
from llama_stack.apis.files.files import OpenAIFileObject
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
QueryChunksResponse,
@ -29,6 +31,7 @@ from llama_stack.apis.vector_io.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreChunkingStrategyAuto,
VectorStoreChunkingStrategyStatic,
VectorStoreFileContentsResponse,
VectorStoreFileCounts,
VectorStoreFileDeleteResponse,
VectorStoreFileLastError,
@ -75,7 +78,9 @@ class OpenAIVectorStoreMixin(ABC):
pass
@abstractmethod
async def _save_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
"""Save vector store file metadata to persistent storage."""
pass
@ -84,6 +89,11 @@ class OpenAIVectorStoreMixin(ABC):
"""Load vector store file metadata from persistent storage."""
pass
@abstractmethod
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
"""Load vector store file contents from persistent storage."""
pass
@abstractmethod
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
"""Update vector store file metadata in persistent storage."""
@ -491,6 +501,8 @@ class OpenAIVectorStoreMixin(ABC):
attributes = attributes or {}
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
created_at = int(time.time())
chunks: list[Chunk] = []
file_response: OpenAIFileObject | None = None
vector_store_file_object = VectorStoreFileObject(
id=file_id,
@ -554,9 +566,11 @@ class OpenAIVectorStoreMixin(ABC):
# Create OpenAI vector store file metadata
file_info = vector_store_file_object.model_dump(exclude={"last_error"})
file_info["filename"] = file_response.filename if file_response else ""
# Save vector store file to persistent storage (provider-specific)
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info)
dict_chunks = [c.model_dump() for c in chunks]
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks)
# Update file_ids and file_counts in vector store metadata
store_info = self.openai_vector_stores[vector_store_id].copy()
@ -608,6 +622,34 @@ class OpenAIVectorStoreMixin(ABC):
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
return VectorStoreFileObject(**file_info)
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
"""Retrieves the contents of a vector store file."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id)
chunks = [Chunk.model_validate(c) for c in dict_chunks]
contents: list[InterleavedContentItem] = []
for chunk in chunks:
content = chunk.content
if isinstance(content, str):
contents.append(TextContentItem(text=content))
elif isinstance(content, InterleavedContentItem):
contents.append(content)
else:
contents.extend(contents)
return VectorStoreFileContentsResponse(
file_id=file_id,
filename=file_info.get("filename", ""),
attributes=file_info.get("attributes", {}),
content=contents,
)
async def openai_update_vector_store_file(
self,
vector_store_id: str,