Move vector file attach code to OpenAIVectorStoreMixin

This moves the vector store file attach code to the
OpenAIVectorStoreMixin. It also centralizes the mime type and pdf
parsing logic into the existing functions in vector_store.py by making
a small refactor there to allow us to use the same code path.

Signed-off-by: Ben Browning <bbrownin@redhat.com>
This commit is contained in:
Ben Browning 2025-06-11 20:06:30 -04:00
parent 055885bd5a
commit 788d34d8b4
4 changed files with 99 additions and 90 deletions

View file

@ -9,8 +9,6 @@ import base64
import io import io
import json import json
import logging import logging
import mimetypes
import time
from typing import Any from typing import Any
import faiss import faiss
@ -26,13 +24,6 @@ from llama_stack.apis.vector_io import (
QueryChunksResponse, QueryChunksResponse,
VectorIO, VectorIO,
) )
from llama_stack.apis.vector_io.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreChunkingStrategyAuto,
VectorStoreChunkingStrategyStatic,
VectorStoreFileLastError,
VectorStoreFileObject,
)
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
@ -40,8 +31,6 @@ from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIV
from llama_stack.providers.utils.memory.vector_store import ( from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorDBWithIndex,
make_overlapped_chunks,
parse_pdf,
) )
from .config import FaissVectorIOConfig from .config import FaissVectorIOConfig
@ -263,74 +252,3 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
assert self.kvstore is not None assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key) await self.kvstore.delete(key)
async def openai_attach_file_to_vector_store(
self,
vector_store_id: str,
file_id: str,
attributes: dict[str, Any] | None = None,
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileObject:
attributes = attributes or {}
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
vector_store_file_object = VectorStoreFileObject(
id=file_id,
attributes=attributes,
chunking_strategy=chunking_strategy,
created_at=int(time.time()),
status="in_progress",
vector_store_id=vector_store_id,
)
if isinstance(chunking_strategy, VectorStoreChunkingStrategyStatic):
max_chunk_size_tokens = chunking_strategy.static.max_chunk_size_tokens
chunk_overlap_tokens = chunking_strategy.static.chunk_overlap_tokens
else:
# Default values from OpenAI API docs
max_chunk_size_tokens = 800
chunk_overlap_tokens = 400
try:
file_response = await self.files_api.openai_retrieve_file(file_id)
mime_type, _ = mimetypes.guess_type(file_response.filename)
content_response = await self.files_api.openai_retrieve_file_content(file_id)
# TODO: We can't use content_from_doc directly from vector_store
# but should figure out how to centralize this logic near there
if mime_type == "application/pdf":
content = parse_pdf(content_response.body)
else:
content = content_response.body.decode("utf-8")
chunks = make_overlapped_chunks(
file_id,
content,
max_chunk_size_tokens,
chunk_overlap_tokens,
attributes,
)
if not chunks:
vector_store_file_object.status = "failed"
vector_store_file_object.last_error = VectorStoreFileLastError(
code="server_error",
message="No chunks were generated from the file",
)
return vector_store_file_object
await self.insert_chunks(
vector_db_id=vector_store_id,
chunks=chunks,
)
except Exception as e:
vector_store_file_object.status = "failed"
vector_store_file_object.last_error = VectorStoreFileLastError(
code="server_error",
message=str(e),
)
return vector_store_file_object
vector_store_file_object.status = "completed"
return vector_store_file_object

View file

@ -5,11 +5,13 @@
# the root directory of this source tree. # the root directory of this source tree.
import logging import logging
import mimetypes
import time import time
import uuid import uuid
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any from typing import Any
from llama_stack.apis.files import Files
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import ( from llama_stack.apis.vector_io import (
QueryChunksResponse, QueryChunksResponse,
@ -20,6 +22,15 @@ from llama_stack.apis.vector_io import (
VectorStoreSearchResponse, VectorStoreSearchResponse,
VectorStoreSearchResponsePage, VectorStoreSearchResponsePage,
) )
from llama_stack.apis.vector_io.vector_io import (
Chunk,
VectorStoreChunkingStrategy,
VectorStoreChunkingStrategyAuto,
VectorStoreChunkingStrategyStatic,
VectorStoreFileLastError,
VectorStoreFileObject,
)
from llama_stack.providers.utils.memory.vector_store import content_from_data_and_mime_type, make_overlapped_chunks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,6 +47,7 @@ class OpenAIVectorStoreMixin(ABC):
# These should be provided by the implementing class # These should be provided by the implementing class
openai_vector_stores: dict[str, dict[str, Any]] openai_vector_stores: dict[str, dict[str, Any]]
files_api: Files
@abstractmethod @abstractmethod
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
@ -67,6 +79,16 @@ class OpenAIVectorStoreMixin(ABC):
"""Unregister a vector database (provider-specific implementation).""" """Unregister a vector database (provider-specific implementation)."""
pass pass
@abstractmethod
async def insert_chunks(
self,
vector_db_id: str,
chunks: list[Chunk],
ttl_seconds: int | None = None,
) -> None:
"""Insert chunks into a vector database (provider-specific implementation)."""
pass
@abstractmethod @abstractmethod
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: Any, params: dict[str, Any] | None = None self, vector_db_id: str, query: Any, params: dict[str, Any] | None = None
@ -383,3 +405,70 @@ class OpenAIVectorStoreMixin(ABC):
if metadata[key] != value: if metadata[key] != value:
return False return False
return True return True
async def openai_attach_file_to_vector_store(
self,
vector_store_id: str,
file_id: str,
attributes: dict[str, Any] | None = None,
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileObject:
attributes = attributes or {}
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
vector_store_file_object = VectorStoreFileObject(
id=file_id,
attributes=attributes,
chunking_strategy=chunking_strategy,
created_at=int(time.time()),
status="in_progress",
vector_store_id=vector_store_id,
)
if isinstance(chunking_strategy, VectorStoreChunkingStrategyStatic):
max_chunk_size_tokens = chunking_strategy.static.max_chunk_size_tokens
chunk_overlap_tokens = chunking_strategy.static.chunk_overlap_tokens
else:
# Default values from OpenAI API spec
max_chunk_size_tokens = 800
chunk_overlap_tokens = 400
try:
file_response = await self.files_api.openai_retrieve_file(file_id)
mime_type, _ = mimetypes.guess_type(file_response.filename)
content_response = await self.files_api.openai_retrieve_file_content(file_id)
content = content_from_data_and_mime_type(content_response.body, mime_type)
chunks = make_overlapped_chunks(
file_id,
content,
max_chunk_size_tokens,
chunk_overlap_tokens,
attributes,
)
if not chunks:
vector_store_file_object.status = "failed"
vector_store_file_object.last_error = VectorStoreFileLastError(
code="server_error",
message="No chunks were generated from the file",
)
return vector_store_file_object
await self.insert_chunks(
vector_db_id=vector_store_id,
chunks=chunks,
)
except Exception as e:
logger.error(f"Error attaching file to vector store: {e}")
vector_store_file_object.status = "failed"
vector_store_file_object.last_error = VectorStoreFileLastError(
code="server_error",
message=str(e),
)
return vector_store_file_object
vector_store_file_object.status = "completed"
return vector_store_file_object

View file

@ -72,16 +72,18 @@ def content_from_data(data_url: str) -> str:
data = unquote(data) data = unquote(data)
encoding = parts["encoding"] or "utf-8" encoding = parts["encoding"] or "utf-8"
data = data.encode(encoding) data = data.encode(encoding)
return content_from_data_and_mime_type(data, parts["mimetype"], parts.get("encoding", None))
encoding = parts["encoding"]
if not encoding:
import chardet
detected = chardet.detect(data) def content_from_data_and_mime_type(data: bytes | str, mime_type: str | None, encoding: str | None = None) -> str:
encoding = detected["encoding"] if isinstance(data, bytes):
if not encoding:
import chardet
mime_type = parts["mimetype"] detected = chardet.detect(data)
mime_category = mime_type.split("/")[0] encoding = detected["encoding"]
mime_category = mime_type.split("/")[0] if mime_type else None
if mime_category == "text": if mime_category == "text":
# For text-based files (including CSV, MD) # For text-based files (including CSV, MD)
return data.decode(encoding) return data.decode(encoding)

View file

@ -327,7 +327,7 @@ def test_response_non_streaming_file_search(
vector_store_id=vector_store.id, vector_store_id=vector_store.id,
file_id=file_response.id, file_id=file_response.id,
) )
assert file_attach_response.status == "completed" assert file_attach_response.status == "completed", f"Expected file to be attached, got {file_attach_response}"
assert not file_attach_response.last_error assert not file_attach_response.last_error
# Update our tools with the right vector store id # Update our tools with the right vector store id