chore: Move vector store kvstore implementation into openai_vector_store_mixin.py (#2748)

This commit is contained in:
Francisco Arceo 2025-07-14 18:10:35 -04:00 committed by GitHub
parent 6b8a8c1be9
commit 33f0d83ad3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 203 additions and 234 deletions

View file

@ -40,6 +40,7 @@ See [PGVector's documentation](https://github.com/pgvector/pgvector) for more de
| `db` | `str \| None` | No | postgres | | | `db` | `str \| None` | No | postgres | |
| `user` | `str \| None` | No | postgres | | | `user` | `str \| None` | No | postgres | |
| `password` | `str \| None` | No | mysecretpassword | | | `password` | `str \| None` | No | mysecretpassword | |
| `kvstore` | `utils.kvstore.config.RedisKVStoreConfig \| utils.kvstore.config.SqliteKVStoreConfig \| utils.kvstore.config.PostgresKVStoreConfig \| utils.kvstore.config.MongoDBKVStoreConfig, annotation=NoneType, required=False, default='sqlite', discriminator='type'` | No | | Config for KV store backend (SQLite only for now) |
## Sample Configuration ## Sample Configuration
@ -49,6 +50,9 @@ port: ${env.PGVECTOR_PORT:=5432}
db: ${env.PGVECTOR_DB} db: ${env.PGVECTOR_DB}
user: ${env.PGVECTOR_USER} user: ${env.PGVECTOR_USER}
password: ${env.PGVECTOR_PASSWORD} password: ${env.PGVECTOR_PASSWORD}
kvstore:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/pgvector_registry.db
``` ```

View file

@ -36,7 +36,9 @@ See [Weaviate's documentation](https://weaviate.io/developers/weaviate) for more
## Sample Configuration ## Sample Configuration
```yaml ```yaml
{} kvstore:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/dummy}/weaviate_registry.db
``` ```

View file

@ -181,8 +181,8 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
) )
self.cache[vector_db.identifier] = index self.cache[vector_db.identifier] = index
# Load existing OpenAI vector stores using the mixin method # Load existing OpenAI vector stores into the in-memory cache
self.openai_vector_stores = await self._load_openai_vector_stores() await self.initialize_openai_vector_stores()
async def shutdown(self) -> None: async def shutdown(self) -> None:
# Cleanup if needed # Cleanup if needed
@ -261,42 +261,6 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
# OpenAI Vector Store Mixin abstract method implementations
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to kvstore."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
"""Load all vector store metadata from kvstore."""
assert self.kvstore is not None
start_key = OPENAI_VECTOR_STORES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key)
stores = {}
for store_data in stored_openai_stores:
store_info = json.loads(store_data)
stores[store_info["id"]] = store_info
return stores
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Update vector store metadata in kvstore."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
"""Delete vector store metadata from kvstore."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
if store_id in self.openai_vector_stores:
del self.openai_vector_stores[store_id]
async def _save_openai_vector_store_file( async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None: ) -> None:

View file

@ -452,8 +452,8 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
) )
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
# load any existing OpenAI vector stores # Load existing OpenAI vector stores into the in-memory cache
self.openai_vector_stores = await self._load_openai_vector_stores() await self.initialize_openai_vector_stores()
async def shutdown(self) -> None: async def shutdown(self) -> None:
# nothing to do since we don't maintain a persistent connection # nothing to do since we don't maintain a persistent connection
@ -501,41 +501,6 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
await self.cache[vector_db_id].index.delete() await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_db_id]
# OpenAI Vector Store Mixin abstract method implementations
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to SQLite database."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
"""Load all vector store metadata from SQLite database."""
assert self.kvstore is not None
start_key = OPENAI_VECTOR_STORES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key)
stores = {}
for store_data in stored_openai_stores:
store_info = json.loads(store_data)
stores[store_info["id"]] = store_info
return stores
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Update vector store metadata in SQLite database."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
"""Delete vector store metadata from SQLite database."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
if store_id in self.openai_vector_stores:
del self.openai_vector_stores[store_id]
async def _save_openai_vector_store_file( async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None: ) -> None:

View file

@ -179,7 +179,8 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
uri = os.path.expanduser(self.config.db_path) uri = os.path.expanduser(self.config.db_path)
self.client = MilvusClient(uri=uri) self.client = MilvusClient(uri=uri)
self.openai_vector_stores = await self._load_openai_vector_stores() # Load existing OpenAI vector stores into the in-memory cache
await self.initialize_openai_vector_stores()
async def shutdown(self) -> None: async def shutdown(self) -> None:
self.client.close() self.client.close()
@ -248,36 +249,6 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to persistent storage."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Update vector store metadata in persistent storage."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
self.openai_vector_stores[store_id] = store_info
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
"""Delete vector store metadata from persistent storage."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
if store_id in self.openai_vector_stores:
del self.openai_vector_stores[store_id]
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
"""Load all vector store metadata from persistent storage."""
assert self.kvstore is not None
start_key = OPENAI_VECTOR_STORES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
return {json.loads(s)["id"]: json.loads(s) for s in stored}
async def _save_openai_vector_store_file( async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None: ) -> None:

View file

@ -8,6 +8,10 @@ from typing import Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
from llama_stack.schema_utils import json_schema_type from llama_stack.schema_utils import json_schema_type
@ -18,10 +22,12 @@ class PGVectorVectorIOConfig(BaseModel):
db: str | None = Field(default="postgres") db: str | None = Field(default="postgres")
user: str | None = Field(default="postgres") user: str | None = Field(default="postgres")
password: str | None = Field(default="mysecretpassword") password: str | None = Field(default="mysecretpassword")
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
@classmethod @classmethod
def sample_run_config( def sample_run_config(
cls, cls,
__distro_dir__: str,
host: str = "${env.PGVECTOR_HOST:=localhost}", host: str = "${env.PGVECTOR_HOST:=localhost}",
port: int = "${env.PGVECTOR_PORT:=5432}", port: int = "${env.PGVECTOR_PORT:=5432}",
db: str = "${env.PGVECTOR_DB}", db: str = "${env.PGVECTOR_DB}",
@ -29,4 +35,14 @@ class PGVectorVectorIOConfig(BaseModel):
password: str = "${env.PGVECTOR_PASSWORD}", password: str = "${env.PGVECTOR_PASSWORD}",
**kwargs: Any, **kwargs: Any,
) -> dict[str, Any]: ) -> dict[str, Any]:
return {"host": host, "port": port, "db": db, "user": user, "password": password} return {
"host": host,
"port": port,
"db": db,
"user": user,
"password": password,
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="pgvector_registry.db",
),
}

View file

@ -13,24 +13,18 @@ from psycopg2 import sql
from psycopg2.extras import Json, execute_values from psycopg2.extras import Json, execute_values
from pydantic import BaseModel, TypeAdapter from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.files.files import Files
from llama_stack.apis.inference import InterleavedContent from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import ( from llama_stack.apis.vector_io import (
Chunk, Chunk,
QueryChunksResponse, QueryChunksResponse,
SearchRankingOptions,
VectorIO, VectorIO,
VectorStoreChunkingStrategy,
VectorStoreDeleteResponse,
VectorStoreFileContentsResponse,
VectorStoreFileObject,
VectorStoreFileStatus,
VectorStoreListFilesResponse,
VectorStoreListResponse,
VectorStoreObject,
VectorStoreSearchResponsePage,
) )
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ( from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorDBWithIndex,
@ -40,6 +34,13 @@ from .config import PGVectorVectorIOConfig
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:pgvector:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:pgvector:{VERSION}::"
def check_extension_version(cur): def check_extension_version(cur):
cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'vector'") cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'vector'")
@ -69,7 +70,7 @@ def load_models(cur, cls):
class PGVectorIndex(EmbeddingIndex): class PGVectorIndex(EmbeddingIndex):
def __init__(self, vector_db: VectorDB, dimension: int, conn): def __init__(self, vector_db: VectorDB, dimension: int, conn, kvstore: KVStore | None = None):
self.conn = conn self.conn = conn
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Sanitize the table name by replacing hyphens with underscores # Sanitize the table name by replacing hyphens with underscores
@ -77,6 +78,7 @@ class PGVectorIndex(EmbeddingIndex):
# when created with patterns like "test-vector-db-{uuid4()}" # when created with patterns like "test-vector-db-{uuid4()}"
sanitized_identifier = vector_db.identifier.replace("-", "_") sanitized_identifier = vector_db.identifier.replace("-", "_")
self.table_name = f"vector_store_{sanitized_identifier}" self.table_name = f"vector_store_{sanitized_identifier}"
self.kvstore = kvstore
cur.execute( cur.execute(
f""" f"""
@ -158,15 +160,28 @@ class PGVectorIndex(EmbeddingIndex):
cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") cur.execute(f"DROP TABLE IF EXISTS {self.table_name}")
class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
def __init__(self, config: PGVectorVectorIOConfig, inference_api: Api.inference) -> None: def __init__(
self,
config: PGVectorVectorIOConfig,
inference_api: Api.inference,
files_api: Files | None = None,
) -> None:
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.conn = None self.conn = None
self.cache = {} self.cache = {}
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_store: dict[str, dict[str, Any]] = {}
self.metadatadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None: async def initialize(self) -> None:
log.info(f"Initializing PGVector memory adapter with config: {self.config}") log.info(f"Initializing PGVector memory adapter with config: {self.config}")
self.kvstore = await kvstore_impl(self.config.kvstore)
await self.initialize_openai_vector_stores()
try: try:
self.conn = psycopg2.connect( self.conn = psycopg2.connect(
host=self.config.host, host=self.config.host,
@ -201,14 +216,31 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
log.info("Connection to PGVector database server closed") log.info("Connection to PGVector database server closed")
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_db(self, vector_db: VectorDB) -> None:
# Persist vector DB metadata in the KV store
assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json())
# Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_db.identifier, vector_db)]) upsert_models(self.conn, [(vector_db.identifier, vector_db)])
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn) # Create and cache the PGVector index table for the vector DB
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) index = VectorDBWithIndex(
vector_db,
index=PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn, kvstore=self.kvstore),
inference_api=self.inference_api,
)
self.cache[vector_db.identifier] = index
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_db(self, vector_db_id: str) -> None:
await self.cache[vector_db_id].index.delete() # Remove provider index and cache
del self.cache[vector_db_id] if vector_db_id in self.cache:
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
# Delete vector DB metadata from KV store
assert self.kvstore is not None
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_db_id}")
async def insert_chunks( async def insert_chunks(
self, self,
@ -237,107 +269,20 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api)
return self.cache[vector_db_id] return self.cache[vector_db_id]
async def openai_create_vector_store( # OpenAI Vector Stores File operations are not supported in PGVector
self, async def _save_openai_vector_store_file(
name: str, self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
file_ids: list[str] | None = None, ) -> None:
expires_after: dict[str, Any] | None = None,
chunking_strategy: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
embedding_model: str | None = None,
embedding_dimension: int | None = 384,
provider_id: str | None = None,
provider_vector_db_id: str | None = None,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_list_vector_stores( async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
self,
limit: int | None = 20,
order: str | None = "desc",
after: str | None = None,
before: str | None = None,
) -> VectorStoreListResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_retrieve_vector_store( async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
self,
vector_store_id: str,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_update_vector_store( async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
self,
vector_store_id: str,
name: str | None = None,
expires_after: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> VectorStoreObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_delete_vector_store( async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
self,
vector_store_id: str,
) -> VectorStoreDeleteResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_search_vector_store(
self,
vector_store_id: str,
query: str | list[str],
filters: dict[str, Any] | None = None,
max_num_results: int | None = 10,
ranking_options: SearchRankingOptions | None = None,
rewrite_query: bool | None = False,
search_mode: str | None = "vector",
) -> VectorStoreSearchResponsePage:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_attach_file_to_vector_store(
self,
vector_store_id: str,
file_id: str,
attributes: dict[str, Any] | None = None,
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_list_files_in_vector_store(
self,
vector_store_id: str,
limit: int | None = 20,
order: str | None = "desc",
after: str | None = None,
before: str | None = None,
filter: VectorStoreFileStatus | None = None,
) -> VectorStoreListFilesResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_retrieve_vector_store_file(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_retrieve_vector_store_file_contents(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileContentsResponse:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_update_vector_store_file(
self,
vector_store_id: str,
file_id: str,
attributes: dict[str, Any] | None = None,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
async def openai_delete_vector_store_file(
self,
vector_store_id: str,
file_id: str,
) -> VectorStoreFileObject:
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")

View file

@ -6,15 +6,26 @@
from typing import Any from typing import Any
from pydantic import BaseModel from pydantic import BaseModel, Field
from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
class WeaviateRequestProviderData(BaseModel): class WeaviateRequestProviderData(BaseModel):
weaviate_api_key: str weaviate_api_key: str
weaviate_cluster_url: str weaviate_cluster_url: str
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
class WeaviateVectorIOConfig(BaseModel): class WeaviateVectorIOConfig(BaseModel):
@classmethod @classmethod
def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
return {} return {
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="weaviate_registry.db",
),
}

View file

@ -14,10 +14,13 @@ from weaviate.classes.init import Auth
from weaviate.classes.query import Filter from weaviate.classes.query import Filter
from llama_stack.apis.common.content_types import InterleavedContent from llama_stack.apis.common.content_types import InterleavedContent
from llama_stack.apis.files.files import Files
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.distribution.request_headers import NeedsRequestProviderData from llama_stack.distribution.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.vector_store import ( from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorDBWithIndex,
@ -27,11 +30,19 @@ from .config import WeaviateRequestProviderData, WeaviateVectorIOConfig
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:weaviate:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:weaviate:{VERSION}::"
class WeaviateIndex(EmbeddingIndex): class WeaviateIndex(EmbeddingIndex):
def __init__(self, client: weaviate.Client, collection_name: str): def __init__(self, client: weaviate.Client, collection_name: str, kvstore: KVStore | None = None):
self.client = client self.client = client
self.collection_name = collection_name self.collection_name = collection_name
self.kvstore = kvstore
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), ( assert len(chunks) == len(embeddings), (
@ -109,11 +120,21 @@ class WeaviateVectorIOAdapter(
NeedsRequestProviderData, NeedsRequestProviderData,
VectorDBsProtocolPrivate, VectorDBsProtocolPrivate,
): ):
def __init__(self, config: WeaviateVectorIOConfig, inference_api: Api.inference) -> None: def __init__(
self,
config: WeaviateVectorIOConfig,
inference_api: Api.inference,
files_api: Files | None,
) -> None:
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.client_cache = {} self.client_cache = {}
self.cache = {} self.cache = {}
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.Client: def _get_client(self) -> weaviate.Client:
provider_data = self.get_request_provider_data() provider_data = self.get_request_provider_data()
@ -132,7 +153,26 @@ class WeaviateVectorIOAdapter(
return client return client
async def initialize(self) -> None: async def initialize(self) -> None:
pass """Set up KV store and load existing vector DBs and OpenAI vector stores."""
# Initialize KV store for metadata
self.kvstore = await kvstore_impl(self.config.kvstore)
# Load existing vector DB definitions
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=idx,
inference_api=self.inference_api,
)
# Load OpenAI vector stores metadata into cache
await self.initialize_openai_vector_stores()
async def shutdown(self) -> None: async def shutdown(self) -> None:
for client in self.client_cache.values(): for client in self.client_cache.values():
@ -206,3 +246,21 @@ class WeaviateVectorIOAdapter(
raise ValueError(f"Vector DB {vector_db_id} not found") raise ValueError(f"Vector DB {vector_db_id} not found")
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
# OpenAI Vector Stores File operations are not supported in Weaviate
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")

View file

@ -5,6 +5,7 @@
# the root directory of this source tree. # the root directory of this source tree.
import asyncio import asyncio
import json
import logging import logging
import mimetypes import mimetypes
import time import time
@ -35,6 +36,7 @@ from llama_stack.apis.vector_io import (
VectorStoreSearchResponse, VectorStoreSearchResponse,
VectorStoreSearchResponsePage, VectorStoreSearchResponsePage,
) )
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.vector_store import content_from_data_and_mime_type, make_overlapped_chunks from llama_stack.providers.utils.memory.vector_store import content_from_data_and_mime_type, make_overlapped_chunks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -59,26 +61,45 @@ class OpenAIVectorStoreMixin(ABC):
# These should be provided by the implementing class # These should be provided by the implementing class
openai_vector_stores: dict[str, dict[str, Any]] openai_vector_stores: dict[str, dict[str, Any]]
files_api: Files | None files_api: Files | None
# KV store for persisting OpenAI vector store metadata
kvstore: KVStore | None
@abstractmethod
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to persistent storage.""" """Save vector store metadata to persistent storage."""
pass assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
# update in-memory cache
self.openai_vector_stores[store_id] = store_info
@abstractmethod
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
"""Load all vector store metadata from persistent storage.""" """Load all vector store metadata from persistent storage."""
pass assert self.kvstore is not None
start_key = OPENAI_VECTOR_STORES_PREFIX
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
stored_data = await self.kvstore.values_in_range(start_key, end_key)
stores: dict[str, dict[str, Any]] = {}
for item in stored_data:
info = json.loads(item)
stores[info["id"]] = info
return stores
@abstractmethod
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Update vector store metadata in persistent storage.""" """Update vector store metadata in persistent storage."""
pass assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.set(key=key, value=json.dumps(store_info))
# update in-memory cache
self.openai_vector_stores[store_id] = store_info
@abstractmethod
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
"""Delete vector store metadata from persistent storage.""" """Delete vector store metadata from persistent storage."""
pass assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
# remove from in-memory cache
self.openai_vector_stores.pop(store_id, None)
@abstractmethod @abstractmethod
async def _save_openai_vector_store_file( async def _save_openai_vector_store_file(
@ -117,6 +138,10 @@ class OpenAIVectorStoreMixin(ABC):
"""Unregister a vector database (provider-specific implementation).""" """Unregister a vector database (provider-specific implementation)."""
pass pass
async def initialize_openai_vector_stores(self) -> None:
"""Load existing OpenAI vector stores into the in-memory cache."""
self.openai_vector_stores = await self._load_openai_vector_stores()
@abstractmethod @abstractmethod
async def insert_chunks( async def insert_chunks(
self, self,

View file

@ -128,6 +128,7 @@ def get_distribution_template() -> DistributionTemplate:
provider_id="${env.ENABLE_PGVECTOR:+pgvector}", provider_id="${env.ENABLE_PGVECTOR:+pgvector}",
provider_type="remote::pgvector", provider_type="remote::pgvector",
config=PGVectorVectorIOConfig.sample_run_config( config=PGVectorVectorIOConfig.sample_run_config(
f"~/.llama/distributions/{name}",
db="${env.PGVECTOR_DB:=}", db="${env.PGVECTOR_DB:=}",
user="${env.PGVECTOR_USER:=}", user="${env.PGVECTOR_USER:=}",
password="${env.PGVECTOR_PASSWORD:=}", password="${env.PGVECTOR_PASSWORD:=}",

View file

@ -54,6 +54,9 @@ providers:
db: ${env.PGVECTOR_DB:=} db: ${env.PGVECTOR_DB:=}
user: ${env.PGVECTOR_USER:=} user: ${env.PGVECTOR_USER:=}
password: ${env.PGVECTOR_PASSWORD:=} password: ${env.PGVECTOR_PASSWORD:=}
kvstore:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/pgvector_registry.db
safety: safety:
- provider_id: llama-guard - provider_id: llama-guard
provider_type: inline::llama-guard provider_type: inline::llama-guard

View file

@ -166,6 +166,9 @@ providers:
db: ${env.PGVECTOR_DB:=} db: ${env.PGVECTOR_DB:=}
user: ${env.PGVECTOR_USER:=} user: ${env.PGVECTOR_USER:=}
password: ${env.PGVECTOR_PASSWORD:=} password: ${env.PGVECTOR_PASSWORD:=}
kvstore:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/pgvector_registry.db
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -241,6 +241,7 @@ def get_distribution_template() -> DistributionTemplate:
provider_id="${env.ENABLE_PGVECTOR:=__disabled__}", provider_id="${env.ENABLE_PGVECTOR:=__disabled__}",
provider_type="remote::pgvector", provider_type="remote::pgvector",
config=PGVectorVectorIOConfig.sample_run_config( config=PGVectorVectorIOConfig.sample_run_config(
f"~/.llama/distributions/{name}",
db="${env.PGVECTOR_DB:=}", db="${env.PGVECTOR_DB:=}",
user="${env.PGVECTOR_USER:=}", user="${env.PGVECTOR_USER:=}",
password="${env.PGVECTOR_PASSWORD:=}", password="${env.PGVECTOR_PASSWORD:=}",