Update sqlite-vec provider to support OpenAI vector store apis

This commit is contained in:
Hardik Shah 2025-06-09 14:38:26 -07:00
parent b55f1249e0
commit 1a888a6bfe
2 changed files with 296 additions and 12 deletions

View file

@ -6,9 +6,11 @@
import asyncio import asyncio
import hashlib import hashlib
import json
import logging import logging
import sqlite3 import sqlite3
import struct import struct
import time
import uuid import uuid
from typing import Any, Literal from typing import Any, Literal
@ -37,6 +39,11 @@ VECTOR_SEARCH = "vector"
KEYWORD_SEARCH = "keyword" KEYWORD_SEARCH = "keyword"
SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH} SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH}
# Constants for OpenAI vector stores (similar to faiss)
VERSION = "v3"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
CHUNK_MULTIPLIER = 5
def serialize_vector(vector: list[float]) -> bytes: def serialize_vector(vector: list[float]) -> bytes:
"""Serialize a list of floats into a compact binary representation.""" """Serialize a list of floats into a compact binary representation."""
@ -307,6 +314,7 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.cache: dict[str, VectorDBWithIndex] = {} self.cache: dict[str, VectorDBWithIndex] = {}
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
async def initialize(self) -> None: async def initialize(self) -> None:
def _setup_connection(): def _setup_connection():
@ -321,17 +329,29 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
metadata TEXT metadata TEXT
); );
""") """)
# Create a table to persist OpenAI vector stores.
cur.execute("""
CREATE TABLE IF NOT EXISTS openai_vector_stores (
id TEXT PRIMARY KEY,
metadata TEXT
);
""")
connection.commit() connection.commit()
# Load any existing vector DB registrations. # Load any existing vector DB registrations.
cur.execute("SELECT metadata FROM vector_dbs") cur.execute("SELECT metadata FROM vector_dbs")
rows = cur.fetchall() vector_db_rows = cur.fetchall()
return rows # Load any existing OpenAI vector stores.
cur.execute("SELECT metadata FROM openai_vector_stores")
openai_store_rows = cur.fetchall()
return vector_db_rows, openai_store_rows
finally: finally:
cur.close() cur.close()
connection.close() connection.close()
rows = await asyncio.to_thread(_setup_connection) vector_db_rows, openai_store_rows = await asyncio.to_thread(_setup_connection)
for row in rows:
# Load existing vector DBs
for row in vector_db_rows:
vector_db_data = row[0] vector_db_data = row[0]
vector_db = VectorDB.model_validate_json(vector_db_data) vector_db = VectorDB.model_validate_json(vector_db_data)
index = await SQLiteVecIndex.create( index = await SQLiteVecIndex.create(
@ -339,6 +359,12 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
) )
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
# Load existing OpenAI vector stores
for row in openai_store_rows:
store_data = row[0]
store_info = json.loads(store_data)
self.openai_vector_stores[store_info["id"]] = store_info
async def shutdown(self) -> None: async def shutdown(self) -> None:
# nothing to do since we don't maintain a persistent connection # nothing to do since we don't maintain a persistent connection
pass pass
@ -409,7 +435,88 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
provider_id: str | None = None, provider_id: str | None = None,
provider_vector_db_id: str | None = None, provider_vector_db_id: str | None = None,
) -> VectorStoreObject: ) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Creates a vector store."""
# store and vector_db have the same id
store_id = name or str(uuid.uuid4())
created_at = int(time.time())
if provider_id is None:
raise ValueError("Provider ID is required")
if embedding_model is None:
raise ValueError("Embedding model is required")
# Use provided embedding dimension or default to 384
if embedding_dimension is None:
raise ValueError("Embedding dimension is required")
provider_vector_db_id = provider_vector_db_id or store_id
vector_db = VectorDB(
identifier=store_id,
embedding_dimension=embedding_dimension,
embedding_model=embedding_model,
provider_id=provider_id,
provider_resource_id=provider_vector_db_id,
)
# Register the vector DB
await self.register_vector_db(vector_db)
# Create OpenAI vector store metadata
store_info = {
"id": store_id,
"object": "vector_store",
"created_at": created_at,
"name": store_id,
"usage_bytes": 0,
"file_counts": {},
"status": "completed",
"expires_after": expires_after,
"expires_at": None,
"last_active_at": created_at,
"file_ids": file_ids or [],
"chunking_strategy": chunking_strategy,
}
# Add provider information to metadata if provided
metadata = metadata or {}
if provider_id:
metadata["provider_id"] = provider_id
if provider_vector_db_id:
metadata["provider_vector_db_id"] = provider_vector_db_id
store_info["metadata"] = metadata
# Store in SQLite database
def _store_openai_vector_store():
connection = _create_sqlite_connection(self.config.db_path)
cur = connection.cursor()
try:
cur.execute(
"INSERT OR REPLACE INTO openai_vector_stores (id, metadata) VALUES (?, ?)",
(store_id, json.dumps(store_info)),
)
connection.commit()
finally:
cur.close()
connection.close()
await asyncio.to_thread(_store_openai_vector_store)
# Store in memory cache
self.openai_vector_stores[store_id] = store_info
return VectorStoreObject(
id=store_id,
created_at=created_at,
name=store_id,
usage_bytes=0,
file_counts={},
status="completed",
expires_after=expires_after,
expires_at=None,
last_active_at=created_at,
metadata=metadata,
)
async def openai_list_vector_stores( async def openai_list_vector_stores(
self, self,
@ -418,13 +525,51 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
after: str | None = None, after: str | None = None,
before: str | None = None, before: str | None = None,
) -> VectorStoreListResponse: ) -> VectorStoreListResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Returns a list of vector stores."""
# Get all vector stores
all_stores = list(self.openai_vector_stores.values())
# Sort by created_at
reverse_order = order == "desc"
all_stores.sort(key=lambda x: x["created_at"], reverse=reverse_order)
# Apply cursor-based pagination
if after:
after_index = next((i for i, store in enumerate(all_stores) if store["id"] == after), -1)
if after_index >= 0:
all_stores = all_stores[after_index + 1 :]
if before:
before_index = next((i for i, store in enumerate(all_stores) if store["id"] == before), len(all_stores))
all_stores = all_stores[:before_index]
# Apply limit
limited_stores = all_stores[:limit]
# Convert to VectorStoreObject instances
data = [VectorStoreObject(**store) for store in limited_stores]
# Determine pagination info
has_more = len(all_stores) > limit
first_id = data[0].id if data else None
last_id = data[-1].id if data else None
return VectorStoreListResponse(
data=data,
has_more=has_more,
first_id=first_id,
last_id=last_id,
)
async def openai_retrieve_vector_store( async def openai_retrieve_vector_store(
self, self,
vector_store_id: str, vector_store_id: str,
) -> VectorStoreObject: ) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Retrieves a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
store_info = self.openai_vector_stores[vector_store_id]
return VectorStoreObject(**store_info)
async def openai_update_vector_store( async def openai_update_vector_store(
self, self,
@ -433,13 +578,78 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
expires_after: dict[str, Any] | None = None, expires_after: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None, metadata: dict[str, Any] | None = None,
) -> VectorStoreObject: ) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Modifies a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
store_info = self.openai_vector_stores[vector_store_id].copy()
# Update fields if provided
if name is not None:
store_info["name"] = name
if expires_after is not None:
store_info["expires_after"] = expires_after
if metadata is not None:
store_info["metadata"] = metadata
# Update last_active_at
store_info["last_active_at"] = int(time.time())
# Save to SQLite database
def _update_openai_vector_store():
connection = _create_sqlite_connection(self.config.db_path)
cur = connection.cursor()
try:
cur.execute(
"UPDATE openai_vector_stores SET metadata = ? WHERE id = ?",
(json.dumps(store_info), vector_store_id),
)
connection.commit()
finally:
cur.close()
connection.close()
await asyncio.to_thread(_update_openai_vector_store)
# Update in-memory cache
self.openai_vector_stores[vector_store_id] = store_info
return VectorStoreObject(**store_info)
async def openai_delete_vector_store( async def openai_delete_vector_store(
self, self,
vector_store_id: str, vector_store_id: str,
) -> VectorStoreDeleteResponse: ) -> VectorStoreDeleteResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Delete a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
# Delete from SQLite database
def _delete_openai_vector_store():
connection = _create_sqlite_connection(self.config.db_path)
cur = connection.cursor()
try:
cur.execute("DELETE FROM openai_vector_stores WHERE id = ?", (vector_store_id,))
connection.commit()
finally:
cur.close()
connection.close()
await asyncio.to_thread(_delete_openai_vector_store)
# Delete from in-memory cache
del self.openai_vector_stores[vector_store_id]
# Also delete the underlying vector DB
try:
await self.unregister_vector_db(vector_store_id)
except Exception as e:
logger.warning(f"Failed to delete underlying vector DB {vector_store_id}: {e}")
return VectorStoreDeleteResponse(
id=vector_store_id,
deleted=True,
)
async def openai_search_vector_store( async def openai_search_vector_store(
self, self,
@ -451,7 +661,79 @@ class SQLiteVecVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
rewrite_query: bool = False, rewrite_query: bool = False,
search_mode: Literal["keyword", "vector", "hybrid"] = "vector", search_mode: Literal["keyword", "vector", "hybrid"] = "vector",
) -> VectorStoreSearchResponse: ) -> VectorStoreSearchResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in SQLiteVec") """Search for chunks in a vector store."""
if vector_store_id not in self.openai_vector_stores:
raise ValueError(f"Vector store {vector_store_id} not found")
if isinstance(query, list):
search_query = " ".join(query)
else:
search_query = query
try:
score_threshold = ranking_options.get("score_threshold", 0.0) if ranking_options else 0.0
params = {
"max_chunks": max_num_results * CHUNK_MULTIPLIER,
"score_threshold": score_threshold,
"mode": search_mode,
}
# TODO: Add support for ranking_options.ranker
response = await self.query_chunks(
vector_db_id=vector_store_id,
query=search_query,
params=params,
)
# Convert response to OpenAI format
data = []
for i, (chunk, score) in enumerate(zip(response.chunks, response.scores, strict=False)):
# Apply score based filtering
if score < score_threshold:
continue
# Apply filters if provided
if filters:
# Simple metadata filtering
if not self._matches_filters(chunk.metadata, filters):
continue
chunk_data = {
"id": f"chunk_{i}",
"object": "vector_store.search_result",
"score": score,
"content": chunk.content.content if hasattr(chunk.content, "content") else str(chunk.content),
"metadata": chunk.metadata,
}
data.append(chunk_data)
if len(data) >= max_num_results:
break
return VectorStoreSearchResponse(
search_query=search_query,
data=data,
has_more=False, # For simplicity, we don't implement pagination here
next_page=None,
)
except Exception as e:
logger.error(f"Error searching vector store {vector_store_id}: {e}")
# Return empty results on error
return VectorStoreSearchResponse(
search_query=search_query,
data=[],
has_more=False,
next_page=None,
)
def _matches_filters(self, metadata: dict[str, Any], filters: dict[str, Any]) -> bool:
"""Check if metadata matches the provided filters."""
for key, value in filters.items():
if key not in metadata:
return False
if metadata[key] != value:
return False
return True
def generate_chunk_id(document_id: str, chunk_text: str) -> str: def generate_chunk_id(document_id: str, chunk_text: str) -> str:

View file

@ -22,8 +22,10 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models):
vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"] vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"]
for p in vector_io_providers: for p in vector_io_providers:
if p.provider_type not in ["inline::faiss"]: if p.provider_type in ["inline::faiss", "inline::sqlite-vec"]:
pytest.skip(f"OpenAI vector stores are not supported by {p.provider_type}") return
pytest.skip("OpenAI vector stores are not supported by any provider")
@pytest.fixture @pytest.fixture