From 1a888a6bfe08f6f65bbbd6ee10981b6f1c81d7a2 Mon Sep 17 00:00:00 2001 From: Hardik Shah Date: Mon, 9 Jun 2025 14:38:26 -0700 Subject: [PATCH] Update sqlite-vec provider to support OpenAI vector store apis --- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 302 +++++++++++++++++- .../vector_io/test_openai_vector_stores.py | 6 +- 2 files changed, 296 insertions(+), 12 deletions(-) diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 282eb5a2e..0a0f0e653 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -6,9 +6,11 @@ import asyncio import hashlib +import json import logging import sqlite3 import struct +import time import uuid from typing import Any, Literal @@ -37,6 +39,11 @@ VECTOR_SEARCH = "vector" KEYWORD_SEARCH = "keyword" SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH} +# Constants for OpenAI vector stores (similar to faiss) +VERSION = "v3" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::" +CHUNK_MULTIPLIER = 5 + def serialize_vector(vector: list[float]) -> bytes: """Serialize a list of floats into a compact binary representation.""" @@ -307,6 +314,7 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): self.config = config self.inference_api = inference_api self.cache: dict[str, VectorDBWithIndex] = {} + self.openai_vector_stores: dict[str, dict[str, Any]] = {} async def initialize(self) -> None: def _setup_connection(): @@ -321,17 +329,29 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): metadata TEXT ); """) + # Create a table to persist OpenAI vector stores. + cur.execute(""" + CREATE TABLE IF NOT EXISTS openai_vector_stores ( + id TEXT PRIMARY KEY, + metadata TEXT + ); + """) connection.commit() # Load any existing vector DB registrations. cur.execute("SELECT metadata FROM vector_dbs") - rows = cur.fetchall() - return rows + vector_db_rows = cur.fetchall() + # Load any existing OpenAI vector stores. + cur.execute("SELECT metadata FROM openai_vector_stores") + openai_store_rows = cur.fetchall() + return vector_db_rows, openai_store_rows finally: cur.close() connection.close() - rows = await asyncio.to_thread(_setup_connection) - for row in rows: + vector_db_rows, openai_store_rows = await asyncio.to_thread(_setup_connection) + + # Load existing vector DBs + for row in vector_db_rows: vector_db_data = row[0] vector_db = VectorDB.model_validate_json(vector_db_data) index = await SQLiteVecIndex.create( @@ -339,6 +359,12 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): ) self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) + # Load existing OpenAI vector stores + for row in openai_store_rows: + store_data = row[0] + store_info = json.loads(store_data) + self.openai_vector_stores[store_info["id"]] = store_info + async def shutdown(self) -> None: # nothing to do since we don't maintain a persistent connection pass @@ -409,7 +435,88 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): provider_id: str | None = None, provider_vector_db_id: str | None = None, ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Creates a vector store.""" + # store and vector_db have the same id + store_id = name or str(uuid.uuid4()) + created_at = int(time.time()) + + if provider_id is None: + raise ValueError("Provider ID is required") + + if embedding_model is None: + raise ValueError("Embedding model is required") + + # Use provided embedding dimension or default to 384 + if embedding_dimension is None: + raise ValueError("Embedding dimension is required") + + provider_vector_db_id = provider_vector_db_id or store_id + vector_db = VectorDB( + identifier=store_id, + embedding_dimension=embedding_dimension, + embedding_model=embedding_model, + provider_id=provider_id, + provider_resource_id=provider_vector_db_id, + ) + + # Register the vector DB + await self.register_vector_db(vector_db) + + # Create OpenAI vector store metadata + store_info = { + "id": store_id, + "object": "vector_store", + "created_at": created_at, + "name": store_id, + "usage_bytes": 0, + "file_counts": {}, + "status": "completed", + "expires_after": expires_after, + "expires_at": None, + "last_active_at": created_at, + "file_ids": file_ids or [], + "chunking_strategy": chunking_strategy, + } + + # Add provider information to metadata if provided + metadata = metadata or {} + if provider_id: + metadata["provider_id"] = provider_id + if provider_vector_db_id: + metadata["provider_vector_db_id"] = provider_vector_db_id + store_info["metadata"] = metadata + + # Store in SQLite database + def _store_openai_vector_store(): + connection = _create_sqlite_connection(self.config.db_path) + cur = connection.cursor() + try: + cur.execute( + "INSERT OR REPLACE INTO openai_vector_stores (id, metadata) VALUES (?, ?)", + (store_id, json.dumps(store_info)), + ) + connection.commit() + finally: + cur.close() + connection.close() + + await asyncio.to_thread(_store_openai_vector_store) + + # Store in memory cache + self.openai_vector_stores[store_id] = store_info + + return VectorStoreObject( + id=store_id, + created_at=created_at, + name=store_id, + usage_bytes=0, + file_counts={}, + status="completed", + expires_after=expires_after, + expires_at=None, + last_active_at=created_at, + metadata=metadata, + ) async def openai_list_vector_stores( self, @@ -418,13 +525,51 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): after: str | None = None, before: str | None = None, ) -> VectorStoreListResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Returns a list of vector stores.""" + # Get all vector stores + all_stores = list(self.openai_vector_stores.values()) + + # Sort by created_at + reverse_order = order == "desc" + all_stores.sort(key=lambda x: x["created_at"], reverse=reverse_order) + + # Apply cursor-based pagination + if after: + after_index = next((i for i, store in enumerate(all_stores) if store["id"] == after), -1) + if after_index >= 0: + all_stores = all_stores[after_index + 1 :] + + if before: + before_index = next((i for i, store in enumerate(all_stores) if store["id"] == before), len(all_stores)) + all_stores = all_stores[:before_index] + + # Apply limit + limited_stores = all_stores[:limit] + # Convert to VectorStoreObject instances + data = [VectorStoreObject(**store) for store in limited_stores] + + # Determine pagination info + has_more = len(all_stores) > limit + first_id = data[0].id if data else None + last_id = data[-1].id if data else None + + return VectorStoreListResponse( + data=data, + has_more=has_more, + first_id=first_id, + last_id=last_id, + ) async def openai_retrieve_vector_store( self, vector_store_id: str, ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Retrieves a vector store.""" + if vector_store_id not in self.openai_vector_stores: + raise ValueError(f"Vector store {vector_store_id} not found") + + store_info = self.openai_vector_stores[vector_store_id] + return VectorStoreObject(**store_info) async def openai_update_vector_store( self, @@ -433,13 +578,78 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): expires_after: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None, ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Modifies a vector store.""" + if vector_store_id not in self.openai_vector_stores: + raise ValueError(f"Vector store {vector_store_id} not found") + + store_info = self.openai_vector_stores[vector_store_id].copy() + + # Update fields if provided + if name is not None: + store_info["name"] = name + if expires_after is not None: + store_info["expires_after"] = expires_after + if metadata is not None: + store_info["metadata"] = metadata + + # Update last_active_at + store_info["last_active_at"] = int(time.time()) + + # Save to SQLite database + def _update_openai_vector_store(): + connection = _create_sqlite_connection(self.config.db_path) + cur = connection.cursor() + try: + cur.execute( + "UPDATE openai_vector_stores SET metadata = ? WHERE id = ?", + (json.dumps(store_info), vector_store_id), + ) + connection.commit() + finally: + cur.close() + connection.close() + + await asyncio.to_thread(_update_openai_vector_store) + + # Update in-memory cache + self.openai_vector_stores[vector_store_id] = store_info + + return VectorStoreObject(**store_info) async def openai_delete_vector_store( self, vector_store_id: str, ) -> VectorStoreDeleteResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Delete a vector store.""" + if vector_store_id not in self.openai_vector_stores: + raise ValueError(f"Vector store {vector_store_id} not found") + + # Delete from SQLite database + def _delete_openai_vector_store(): + connection = _create_sqlite_connection(self.config.db_path) + cur = connection.cursor() + try: + cur.execute("DELETE FROM openai_vector_stores WHERE id = ?", (vector_store_id,)) + connection.commit() + finally: + cur.close() + connection.close() + + await asyncio.to_thread(_delete_openai_vector_store) + + # Delete from in-memory cache + del self.openai_vector_stores[vector_store_id] + + # Also delete the underlying vector DB + try: + await self.unregister_vector_db(vector_store_id) + except Exception as e: + logger.warning(f"Failed to delete underlying vector DB {vector_store_id}: {e}") + + return VectorStoreDeleteResponse( + id=vector_store_id, + deleted=True, + ) async def openai_search_vector_store( self, @@ -451,7 +661,79 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): rewrite_query: bool = False, search_mode: Literal["keyword", "vector", "hybrid"] = "vector", ) -> VectorStoreSearchResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") + """Search for chunks in a vector store.""" + if vector_store_id not in self.openai_vector_stores: + raise ValueError(f"Vector store {vector_store_id} not found") + + if isinstance(query, list): + search_query = " ".join(query) + else: + search_query = query + + try: + score_threshold = ranking_options.get("score_threshold", 0.0) if ranking_options else 0.0 + params = { + "max_chunks": max_num_results * CHUNK_MULTIPLIER, + "score_threshold": score_threshold, + "mode": search_mode, + } + # TODO: Add support for ranking_options.ranker + + response = await self.query_chunks( + vector_db_id=vector_store_id, + query=search_query, + params=params, + ) + + # Convert response to OpenAI format + data = [] + for i, (chunk, score) in enumerate(zip(response.chunks, response.scores, strict=False)): + # Apply score based filtering + if score < score_threshold: + continue + + # Apply filters if provided + if filters: + # Simple metadata filtering + if not self._matches_filters(chunk.metadata, filters): + continue + + chunk_data = { + "id": f"chunk_{i}", + "object": "vector_store.search_result", + "score": score, + "content": chunk.content.content if hasattr(chunk.content, "content") else str(chunk.content), + "metadata": chunk.metadata, + } + data.append(chunk_data) + if len(data) >= max_num_results: + break + + return VectorStoreSearchResponse( + search_query=search_query, + data=data, + has_more=False, # For simplicity, we don't implement pagination here + next_page=None, + ) + + except Exception as e: + logger.error(f"Error searching vector store {vector_store_id}: {e}") + # Return empty results on error + return VectorStoreSearchResponse( + search_query=search_query, + data=[], + has_more=False, + next_page=None, + ) + + def _matches_filters(self, metadata: dict[str, Any], filters: dict[str, Any]) -> bool: + """Check if metadata matches the provided filters.""" + for key, value in filters.items(): + if key not in metadata: + return False + if metadata[key] != value: + return False + return True def generate_chunk_id(document_id: str, chunk_text: str) -> str: diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index f7caf0f67..d67c35e69 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -22,8 +22,10 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"] for p in vector_io_providers: - if p.provider_type not in ["inline::faiss"]: - pytest.skip(f"OpenAI vector stores are not supported by {p.provider_type}") + if p.provider_type in ["inline::faiss", "inline::sqlite-vec"]: + return + + pytest.skip("OpenAI vector stores are not supported by any provider") @pytest.fixture