From 33f0d83ad396bf647ee999105272829e86321d38 Mon Sep 17 00:00:00 2001 From: Francisco Arceo Date: Mon, 14 Jul 2025 18:10:35 -0400 Subject: [PATCH] chore: Move vector store `kvstore` implementation into `openai_vector_store_mixin.py` (#2748) --- .../providers/vector_io/remote_pgvector.md | 4 + .../providers/vector_io/remote_weaviate.md | 4 +- .../providers/inline/vector_io/faiss/faiss.py | 40 +---- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 39 +--- .../remote/vector_io/milvus/milvus.py | 33 +--- .../remote/vector_io/pgvector/config.py | 18 +- .../remote/vector_io/pgvector/pgvector.py | 169 ++++++------------ .../remote/vector_io/weaviate/config.py | 17 +- .../remote/vector_io/weaviate/weaviate.py | 64 ++++++- .../utils/memory/openai_vector_store_mixin.py | 41 ++++- .../open-benchmark/open_benchmark.py | 1 + llama_stack/templates/open-benchmark/run.yaml | 3 + llama_stack/templates/starter/run.yaml | 3 + llama_stack/templates/starter/starter.py | 1 + 14 files changed, 203 insertions(+), 234 deletions(-) diff --git a/docs/source/providers/vector_io/remote_pgvector.md b/docs/source/providers/vector_io/remote_pgvector.md index 685b98f37..3e7d6e776 100644 --- a/docs/source/providers/vector_io/remote_pgvector.md +++ b/docs/source/providers/vector_io/remote_pgvector.md @@ -40,6 +40,7 @@ See [PGVector's documentation](https://github.com/pgvector/pgvector) for more de | `db` | `str \| None` | No | postgres | | | `user` | `str \| None` | No | postgres | | | `password` | `str \| None` | No | mysecretpassword | | +| `kvstore` | `utils.kvstore.config.RedisKVStoreConfig \| utils.kvstore.config.SqliteKVStoreConfig \| utils.kvstore.config.PostgresKVStoreConfig \| utils.kvstore.config.MongoDBKVStoreConfig, annotation=NoneType, required=False, default='sqlite', discriminator='type'` | No | | Config for KV store backend (SQLite only for now) | ## Sample Configuration @@ -49,6 +50,9 @@ port: ${env.PGVECTOR_PORT:=5432} db: ${env.PGVECTOR_DB} user: ${env.PGVECTOR_USER} password: ${env.PGVECTOR_PASSWORD} +kvstore: + type: sqlite + db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/pgvector_registry.db ``` diff --git a/docs/source/providers/vector_io/remote_weaviate.md b/docs/source/providers/vector_io/remote_weaviate.md index b7f811c35..d930515d5 100644 --- a/docs/source/providers/vector_io/remote_weaviate.md +++ b/docs/source/providers/vector_io/remote_weaviate.md @@ -36,7 +36,9 @@ See [Weaviate's documentation](https://weaviate.io/developers/weaviate) for more ## Sample Configuration ```yaml -{} +kvstore: + type: sqlite + db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/weaviate_registry.db ``` diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index 0306d9156..2a1370c56 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -181,8 +181,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr ) self.cache[vector_db.identifier] = index - # Load existing OpenAI vector stores using the mixin method - self.openai_vector_stores = await self._load_openai_vector_stores() + # Load existing OpenAI vector stores into the in-memory cache + await self.initialize_openai_vector_stores() async def shutdown(self) -> None: # Cleanup if needed @@ -261,42 +261,6 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr return await index.query_chunks(query, params) - # OpenAI Vector Store Mixin abstract method implementations - async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Save vector store metadata to kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: - """Load all vector store metadata from kvstore.""" - assert self.kvstore is not None - start_key = OPENAI_VECTOR_STORES_PREFIX - end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" - stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key) - - stores = {} - for store_data in stored_openai_stores: - store_info = json.loads(store_data) - stores[store_info["id"]] = store_info - return stores - - async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Update vector store metadata in kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: - """Delete vector store metadata from kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.delete(key) - if store_id in self.openai_vector_stores: - del self.openai_vector_stores[store_id] - async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 6acd85c56..771ffa607 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -452,8 +452,8 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc ) self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) - # load any existing OpenAI vector stores - self.openai_vector_stores = await self._load_openai_vector_stores() + # Load existing OpenAI vector stores into the in-memory cache + await self.initialize_openai_vector_stores() async def shutdown(self) -> None: # nothing to do since we don't maintain a persistent connection @@ -501,41 +501,6 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc await self.cache[vector_db_id].index.delete() del self.cache[vector_db_id] - # OpenAI Vector Store Mixin abstract method implementations - async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Save vector store metadata to SQLite database.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: - """Load all vector store metadata from SQLite database.""" - assert self.kvstore is not None - start_key = OPENAI_VECTOR_STORES_PREFIX - end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" - stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key) - stores = {} - for store_data in stored_openai_stores: - store_info = json.loads(store_data) - stores[store_info["id"]] = store_info - return stores - - async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Update vector store metadata in SQLite database.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: - """Delete vector store metadata from SQLite database.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.delete(key) - if store_id in self.openai_vector_stores: - del self.openai_vector_stores[store_id] - async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index a06130fd0..d88d954ef 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -179,7 +179,8 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP uri = os.path.expanduser(self.config.db_path) self.client = MilvusClient(uri=uri) - self.openai_vector_stores = await self._load_openai_vector_stores() + # Load existing OpenAI vector stores into the in-memory cache + await self.initialize_openai_vector_stores() async def shutdown(self) -> None: self.client.close() @@ -248,36 +249,6 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP return await index.query_chunks(query, params) - async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Save vector store metadata to persistent storage.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Update vector store metadata in persistent storage.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.set(key=key, value=json.dumps(store_info)) - self.openai_vector_stores[store_id] = store_info - - async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: - """Delete vector store metadata from persistent storage.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" - await self.kvstore.delete(key) - if store_id in self.openai_vector_stores: - del self.openai_vector_stores[store_id] - - async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: - """Load all vector store metadata from persistent storage.""" - assert self.kvstore is not None - start_key = OPENAI_VECTOR_STORES_PREFIX - end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" - stored = await self.kvstore.values_in_range(start_key, end_key) - return {json.loads(s)["id"]: json.loads(s) for s in stored} - async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: diff --git a/llama_stack/providers/remote/vector_io/pgvector/config.py b/llama_stack/providers/remote/vector_io/pgvector/config.py index 92908aa8a..334cbe5be 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/config.py +++ b/llama_stack/providers/remote/vector_io/pgvector/config.py @@ -8,6 +8,10 @@ from typing import Any from pydantic import BaseModel, Field +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) from llama_stack.schema_utils import json_schema_type @@ -18,10 +22,12 @@ class PGVectorVectorIOConfig(BaseModel): db: str | None = Field(default="postgres") user: str | None = Field(default="postgres") password: str | None = Field(default="mysecretpassword") + kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None) @classmethod def sample_run_config( cls, + __distro_dir__: str, host: str = "${env.PGVECTOR_HOST:=localhost}", port: int = "${env.PGVECTOR_PORT:=5432}", db: str = "${env.PGVECTOR_DB}", @@ -29,4 +35,14 @@ class PGVectorVectorIOConfig(BaseModel): password: str = "${env.PGVECTOR_PASSWORD}", **kwargs: Any, ) -> dict[str, Any]: - return {"host": host, "port": port, "db": db, "user": user, "password": password} + return { + "host": host, + "port": port, + "db": db, + "user": user, + "password": password, + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="pgvector_registry.db", + ), + } diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index c3cdef9b8..1bf3eedf8 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -13,24 +13,18 @@ from psycopg2 import sql from psycopg2.extras import Json, execute_values from pydantic import BaseModel, TypeAdapter +from llama_stack.apis.files.files import Files from llama_stack.apis.inference import InterleavedContent from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_io import ( Chunk, QueryChunksResponse, - SearchRankingOptions, VectorIO, - VectorStoreChunkingStrategy, - VectorStoreDeleteResponse, - VectorStoreFileContentsResponse, - VectorStoreFileObject, - VectorStoreFileStatus, - VectorStoreListFilesResponse, - VectorStoreListResponse, - VectorStoreObject, - VectorStoreSearchResponsePage, ) from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.api import KVStore +from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( EmbeddingIndex, VectorDBWithIndex, @@ -40,6 +34,13 @@ from .config import PGVectorVectorIOConfig log = logging.getLogger(__name__) +VERSION = "v3" +VECTOR_DBS_PREFIX = f"vector_dbs:pgvector:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:pgvector:{VERSION}::" + def check_extension_version(cur): cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'vector'") @@ -69,7 +70,7 @@ def load_models(cur, cls): class PGVectorIndex(EmbeddingIndex): - def __init__(self, vector_db: VectorDB, dimension: int, conn): + def __init__(self, vector_db: VectorDB, dimension: int, conn, kvstore: KVStore | None = None): self.conn = conn with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: # Sanitize the table name by replacing hyphens with underscores @@ -77,6 +78,7 @@ class PGVectorIndex(EmbeddingIndex): # when created with patterns like "test-vector-db-{uuid4()}" sanitized_identifier = vector_db.identifier.replace("-", "_") self.table_name = f"vector_store_{sanitized_identifier}" + self.kvstore = kvstore cur.execute( f""" @@ -158,15 +160,28 @@ class PGVectorIndex(EmbeddingIndex): cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") -class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): - def __init__(self, config: PGVectorVectorIOConfig, inference_api: Api.inference) -> None: +class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): + def __init__( + self, + config: PGVectorVectorIOConfig, + inference_api: Api.inference, + files_api: Files | None = None, + ) -> None: self.config = config self.inference_api = inference_api self.conn = None self.cache = {} + self.files_api = files_api + self.kvstore: KVStore | None = None + self.vector_db_store = None + self.openai_vector_store: dict[str, dict[str, Any]] = {} + self.metadatadata_collection_name = "openai_vector_stores_metadata" async def initialize(self) -> None: log.info(f"Initializing PGVector memory adapter with config: {self.config}") + self.kvstore = await kvstore_impl(self.config.kvstore) + await self.initialize_openai_vector_stores() + try: self.conn = psycopg2.connect( host=self.config.host, @@ -201,14 +216,31 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): log.info("Connection to PGVector database server closed") async def register_vector_db(self, vector_db: VectorDB) -> None: + # Persist vector DB metadata in the KV store + assert self.kvstore is not None + key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}" + await self.kvstore.set(key=key, value=vector_db.model_dump_json()) + + # Upsert model metadata in Postgres upsert_models(self.conn, [(vector_db.identifier, vector_db)]) - index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn) - self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) + # Create and cache the PGVector index table for the vector DB + index = VectorDBWithIndex( + vector_db, + index=PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn, kvstore=self.kvstore), + inference_api=self.inference_api, + ) + self.cache[vector_db.identifier] = index async def unregister_vector_db(self, vector_db_id: str) -> None: - await self.cache[vector_db_id].index.delete() - del self.cache[vector_db_id] + # Remove provider index and cache + if vector_db_id in self.cache: + await self.cache[vector_db_id].index.delete() + del self.cache[vector_db_id] + + # Delete vector DB metadata from KV store + assert self.kvstore is not None + await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_db_id}") async def insert_chunks( self, @@ -237,107 +269,20 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api) return self.cache[vector_db_id] - async def openai_create_vector_store( - self, - name: str, - file_ids: list[str] | None = None, - expires_after: dict[str, Any] | None = None, - chunking_strategy: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - embedding_model: str | None = None, - embedding_dimension: int | None = 384, - provider_id: str | None = None, - provider_vector_db_id: str | None = None, - ) -> VectorStoreObject: + # OpenAI Vector Stores File operations are not supported in PGVector + async def _save_openai_vector_store_file( + self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] + ) -> None: raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - async def openai_list_vector_stores( - self, - limit: int | None = 20, - order: str | None = "desc", - after: str | None = None, - before: str | None = None, - ) -> VectorStoreListResponse: + async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - async def openai_retrieve_vector_store( - self, - vector_store_id: str, - ) -> VectorStoreObject: + async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - async def openai_update_vector_store( - self, - vector_store_id: str, - name: str | None = None, - expires_after: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - ) -> VectorStoreObject: + async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - async def openai_delete_vector_store( - self, - vector_store_id: str, - ) -> VectorStoreDeleteResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_search_vector_store( - self, - vector_store_id: str, - query: str | list[str], - filters: dict[str, Any] | None = None, - max_num_results: int | None = 10, - ranking_options: SearchRankingOptions | None = None, - rewrite_query: bool | None = False, - search_mode: str | None = "vector", - ) -> VectorStoreSearchResponsePage: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_attach_file_to_vector_store( - self, - vector_store_id: str, - file_id: str, - attributes: dict[str, Any] | None = None, - chunking_strategy: VectorStoreChunkingStrategy | None = None, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_list_files_in_vector_store( - self, - vector_store_id: str, - limit: int | None = 20, - order: str | None = "desc", - after: str | None = None, - before: str | None = None, - filter: VectorStoreFileStatus | None = None, - ) -> VectorStoreListFilesResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_retrieve_vector_store_file( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_retrieve_vector_store_file_contents( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileContentsResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_update_vector_store_file( - self, - vector_store_id: str, - file_id: str, - attributes: dict[str, Any] | None = None, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") - - async def openai_delete_vector_store_file( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileObject: + async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") diff --git a/llama_stack/providers/remote/vector_io/weaviate/config.py b/llama_stack/providers/remote/vector_io/weaviate/config.py index a8c6e3e2c..4283b8d3b 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/config.py +++ b/llama_stack/providers/remote/vector_io/weaviate/config.py @@ -6,15 +6,26 @@ from typing import Any -from pydantic import BaseModel +from pydantic import BaseModel, Field + +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) class WeaviateRequestProviderData(BaseModel): weaviate_api_key: str weaviate_cluster_url: str + kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None) class WeaviateVectorIOConfig(BaseModel): @classmethod - def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: - return {} + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: + return { + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="weaviate_registry.db", + ), + } diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index c63dd70c6..35bb40454 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -14,10 +14,13 @@ from weaviate.classes.init import Auth from weaviate.classes.query import Filter from llama_stack.apis.common.content_types import InterleavedContent +from llama_stack.apis.files.files import Files from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.distribution.request_headers import NeedsRequestProviderData from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.vector_store import ( EmbeddingIndex, VectorDBWithIndex, @@ -27,11 +30,19 @@ from .config import WeaviateRequestProviderData, WeaviateVectorIOConfig log = logging.getLogger(__name__) +VERSION = "v3" +VECTOR_DBS_PREFIX = f"vector_dbs:weaviate:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:weaviate:{VERSION}::" + class WeaviateIndex(EmbeddingIndex): - def __init__(self, client: weaviate.Client, collection_name: str): + def __init__(self, client: weaviate.Client, collection_name: str, kvstore: KVStore | None = None): self.client = client self.collection_name = collection_name + self.kvstore = kvstore async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): assert len(chunks) == len(embeddings), ( @@ -109,11 +120,21 @@ class WeaviateVectorIOAdapter( NeedsRequestProviderData, VectorDBsProtocolPrivate, ): - def __init__(self, config: WeaviateVectorIOConfig, inference_api: Api.inference) -> None: + def __init__( + self, + config: WeaviateVectorIOConfig, + inference_api: Api.inference, + files_api: Files | None, + ) -> None: self.config = config self.inference_api = inference_api self.client_cache = {} self.cache = {} + self.files_api = files_api + self.kvstore: KVStore | None = None + self.vector_db_store = None + self.openai_vector_stores: dict[str, dict[str, Any]] = {} + self.metadata_collection_name = "openai_vector_stores_metadata" def _get_client(self) -> weaviate.Client: provider_data = self.get_request_provider_data() @@ -132,7 +153,26 @@ class WeaviateVectorIOAdapter( return client async def initialize(self) -> None: - pass + """Set up KV store and load existing vector DBs and OpenAI vector stores.""" + # Initialize KV store for metadata + self.kvstore = await kvstore_impl(self.config.kvstore) + + # Load existing vector DB definitions + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored = await self.kvstore.values_in_range(start_key, end_key) + for raw in stored: + vector_db = VectorDB.model_validate_json(raw) + client = self._get_client() + idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore) + self.cache[vector_db.identifier] = VectorDBWithIndex( + vector_db=vector_db, + index=idx, + inference_api=self.inference_api, + ) + + # Load OpenAI vector stores metadata into cache + await self.initialize_openai_vector_stores() async def shutdown(self) -> None: for client in self.client_cache.values(): @@ -206,3 +246,21 @@ class WeaviateVectorIOAdapter( raise ValueError(f"Vector DB {vector_db_id} not found") return await index.query_chunks(query, params) + + # OpenAI Vector Stores File operations are not supported in Weaviate + async def _save_openai_vector_store_file( + self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] + ) -> None: + raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate") + + async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: + raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate") + + async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: + raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate") + + async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: + raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate") + + async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: + raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate") diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index 7c97ff7f6..27bb1c997 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -5,6 +5,7 @@ # the root directory of this source tree. import asyncio +import json import logging import mimetypes import time @@ -35,6 +36,7 @@ from llama_stack.apis.vector_io import ( VectorStoreSearchResponse, VectorStoreSearchResponsePage, ) +from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.vector_store import content_from_data_and_mime_type, make_overlapped_chunks logger = logging.getLogger(__name__) @@ -59,26 +61,45 @@ class OpenAIVectorStoreMixin(ABC): # These should be provided by the implementing class openai_vector_stores: dict[str, dict[str, Any]] files_api: Files | None + # KV store for persisting OpenAI vector store metadata + kvstore: KVStore | None - @abstractmethod async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Save vector store metadata to persistent storage.""" - pass + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) + # update in-memory cache + self.openai_vector_stores[store_id] = store_info - @abstractmethod async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: """Load all vector store metadata from persistent storage.""" - pass + assert self.kvstore is not None + start_key = OPENAI_VECTOR_STORES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" + stored_data = await self.kvstore.values_in_range(start_key, end_key) + + stores: dict[str, dict[str, Any]] = {} + for item in stored_data: + info = json.loads(item) + stores[info["id"]] = info + return stores - @abstractmethod async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Update vector store metadata in persistent storage.""" - pass + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) + # update in-memory cache + self.openai_vector_stores[store_id] = store_info - @abstractmethod async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: """Delete vector store metadata from persistent storage.""" - pass + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.delete(key) + # remove from in-memory cache + self.openai_vector_stores.pop(store_id, None) @abstractmethod async def _save_openai_vector_store_file( @@ -117,6 +138,10 @@ class OpenAIVectorStoreMixin(ABC): """Unregister a vector database (provider-specific implementation).""" pass + async def initialize_openai_vector_stores(self) -> None: + """Load existing OpenAI vector stores into the in-memory cache.""" + self.openai_vector_stores = await self._load_openai_vector_stores() + @abstractmethod async def insert_chunks( self, diff --git a/llama_stack/templates/open-benchmark/open_benchmark.py b/llama_stack/templates/open-benchmark/open_benchmark.py index 56ee9c47d..63a27e07f 100644 --- a/llama_stack/templates/open-benchmark/open_benchmark.py +++ b/llama_stack/templates/open-benchmark/open_benchmark.py @@ -128,6 +128,7 @@ def get_distribution_template() -> DistributionTemplate: provider_id="${env.ENABLE_PGVECTOR:+pgvector}", provider_type="remote::pgvector", config=PGVectorVectorIOConfig.sample_run_config( + f"~/.llama/distributions/{name}", db="${env.PGVECTOR_DB:=}", user="${env.PGVECTOR_USER:=}", password="${env.PGVECTOR_PASSWORD:=}", diff --git a/llama_stack/templates/open-benchmark/run.yaml b/llama_stack/templates/open-benchmark/run.yaml index 0b368ebc9..7d07cc4bf 100644 --- a/llama_stack/templates/open-benchmark/run.yaml +++ b/llama_stack/templates/open-benchmark/run.yaml @@ -54,6 +54,9 @@ providers: db: ${env.PGVECTOR_DB:=} user: ${env.PGVECTOR_USER:=} password: ${env.PGVECTOR_PASSWORD:=} + kvstore: + type: sqlite + db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/pgvector_registry.db safety: - provider_id: llama-guard provider_type: inline::llama-guard diff --git a/llama_stack/templates/starter/run.yaml b/llama_stack/templates/starter/run.yaml index ad449cb1b..8e20f5224 100644 --- a/llama_stack/templates/starter/run.yaml +++ b/llama_stack/templates/starter/run.yaml @@ -166,6 +166,9 @@ providers: db: ${env.PGVECTOR_DB:=} user: ${env.PGVECTOR_USER:=} password: ${env.PGVECTOR_PASSWORD:=} + kvstore: + type: sqlite + db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/pgvector_registry.db files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/llama_stack/templates/starter/starter.py b/llama_stack/templates/starter/starter.py index c0ac44183..f6ca73028 100644 --- a/llama_stack/templates/starter/starter.py +++ b/llama_stack/templates/starter/starter.py @@ -241,6 +241,7 @@ def get_distribution_template() -> DistributionTemplate: provider_id="${env.ENABLE_PGVECTOR:=__disabled__}", provider_type="remote::pgvector", config=PGVectorVectorIOConfig.sample_run_config( + f"~/.llama/distributions/{name}", db="${env.PGVECTOR_DB:=}", user="${env.PGVECTOR_USER:=}", password="${env.PGVECTOR_PASSWORD:=}",