chore(cleanup)!: kill vector_db references as far as possible (#3864)

There should not be "vector db" anywhere.
This commit is contained in:
Ashwin Bharambe 2025-10-20 20:06:16 -07:00 committed by GitHub
parent 444f6c88f3
commit 122de785c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 701 additions and 822 deletions

View file

@ -17,7 +17,7 @@ from llama_stack.apis.models import Model
from llama_stack.apis.scoring_functions import ScoringFn
from llama_stack.apis.shields import Shield
from llama_stack.apis.tools import ToolGroup
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.schema_utils import json_schema_type
@ -68,10 +68,10 @@ class ShieldsProtocolPrivate(Protocol):
async def unregister_shield(self, identifier: str) -> None: ...
class VectorDBsProtocolPrivate(Protocol):
async def register_vector_db(self, vector_db: VectorDB) -> None: ...
class VectorStoresProtocolPrivate(Protocol):
async def register_vector_store(self, vector_store: VectorStore) -> None: ...
async def unregister_vector_db(self, vector_db_id: str) -> None: ...
async def unregister_vector_store(self, vector_store_id: str) -> None: ...
class DatasetsProtocolPrivate(Protocol):

View file

@ -17,21 +17,21 @@ from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import FaissVectorIOConfig
logger = get_logger(name=__name__, category="vector_io")
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:{VERSION}::"
FAISS_INDEX_PREFIX = f"faiss_index:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
@ -176,28 +176,28 @@ class FaissIndex(EmbeddingIndex):
)
class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.cache: dict[str, VectorDBWithIndex] = {}
self.cache: dict[str, VectorStoreWithIndex] = {}
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
# Load existing banks from kvstore
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs:
vector_db = VectorDB.model_validate_json(vector_db_data)
index = VectorDBWithIndex(
vector_db,
await FaissIndex.create(vector_db.embedding_dimension, self.kvstore, vector_db.identifier),
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store,
await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
self.inference_api,
)
self.cache[vector_db.identifier] = index
self.cache[vector_store.identifier] = index
# Load existing OpenAI vector stores into the in-memory cache
await self.initialize_openai_vector_stores()
@ -222,32 +222,31 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
except Exception as e:
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json())
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Store in cache
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=await FaissIndex.create(vector_db.embedding_dimension, self.kvstore, vector_db.identifier),
self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_store=vector_store,
index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
inference_api=self.inference_api,
)
async def list_vector_dbs(self) -> list[VectorDB]:
return [i.vector_db for i in self.cache.values()]
async def list_vector_stores(self) -> list[VectorStore]:
return [i.vector_store for i in self.cache.values()]
async def unregister_vector_db(self, vector_db_id: str) -> None:
async def unregister_vector_store(self, vector_store_id: str) -> None:
assert self.kvstore is not None
if vector_db_id not in self.cache:
logger.warning(f"Vector DB {vector_db_id} not found")
if vector_store_id not in self.cache:
return
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}")
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = self.cache.get(vector_db_id)

View file

@ -17,10 +17,10 @@ from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
@ -28,7 +28,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_RRF,
ChunkForDeletion,
EmbeddingIndex,
VectorDBWithIndex,
VectorStoreWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator
@ -41,7 +41,7 @@ HYBRID_SEARCH = "hybrid"
SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH, HYBRID_SEARCH}
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:sqlite_vec:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:sqlite_vec:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:sqlite_vec:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:sqlite_vec:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:sqlite_vec:{VERSION}::"
@ -374,32 +374,32 @@ class SQLiteVecIndex(EmbeddingIndex):
await asyncio.to_thread(_delete_chunks)
class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
"""
A VectorIO implementation using SQLite + sqlite_vec.
This class handles vector database registration (with metadata stored in a table named `vector_dbs`)
and creates a cache of VectorDBWithIndex instances (each wrapping a SQLiteVecIndex).
This class handles vector database registration (with metadata stored in a table named `vector_stores`)
and creates a cache of VectorStoreWithIndex instances (each wrapping a SQLiteVecIndex).
"""
def __init__(self, config, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.cache: dict[str, VectorDBWithIndex] = {}
self.vector_db_store = None
self.cache: dict[str, VectorStoreWithIndex] = {}
self.vector_store_table = None
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
for db_json in stored_vector_dbs:
vector_db = VectorDB.model_validate_json(db_json)
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for db_json in stored_vector_stores:
vector_store = VectorStore.model_validate_json(db_json)
index = await SQLiteVecIndex.create(
vector_db.embedding_dimension, self.config.db_path, vector_db.identifier
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
)
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
self.cache[vector_store.identifier] = VectorStoreWithIndex(vector_store, index, self.inference_api)
# Load existing OpenAI vector stores into the in-memory cache
await self.initialize_openai_vector_stores()
@ -408,63 +408,64 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def list_vector_dbs(self) -> list[VectorDB]:
return [v.vector_db for v in self.cache.values()]
async def list_vector_stores(self) -> list[VectorStore]:
return [v.vector_store for v in self.cache.values()]
async def register_vector_db(self, vector_db: VectorDB) -> None:
index = await SQLiteVecIndex.create(vector_db.embedding_dimension, self.config.db_path, vector_db.identifier)
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
async def register_vector_store(self, vector_store: VectorStore) -> None:
index = await SQLiteVecIndex.create(
vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
)
self.cache[vector_store.identifier] = VectorStoreWithIndex(vector_store, index, self.inference_api)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_db_store is None:
raise VectorStoreNotFoundError(vector_db_id)
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_db = self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
vector_store = self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
index = VectorDBWithIndex(
vector_db=vector_db,
index = VectorStoreWithIndex(
vector_store=vector_store,
index=SQLiteVecIndex(
dimension=vector_db.embedding_dimension,
dimension=vector_store.embedding_dimension,
db_path=self.config.db_path,
bank_id=vector_db.identifier,
bank_id=vector_store.identifier,
kvstore=self.kvstore,
),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[vector_store_id] = index
return index
async def unregister_vector_db(self, vector_db_id: str) -> None:
if vector_db_id not in self.cache:
logger.warning(f"Vector DB {vector_db_id} not found")
async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_store_id not in self.cache:
return
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
# The VectorDBWithIndex helper is expected to compute embeddings via the inference_api
# The VectorStoreWithIndex helper is expected to compute embeddings via the inference_api
# and then call our index's add_chunks.
await index.insert_chunks(chunks)
async def query_chunks(
self, vector_db_id: str, query: Any, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a sqlite_vec index."""
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise VectorStoreNotFoundError(store_id)

View file

@ -13,15 +13,15 @@ from numpy.typing import NDArray
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.chroma import ChromaVectorIOConfig as InlineChromaVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import ChromaVectorIOConfig as RemoteChromaVectorIOConfig
@ -30,7 +30,7 @@ log = get_logger(name=__name__, category="vector_io::chroma")
ChromaClientType = chromadb.api.AsyncClientAPI | chromadb.api.ClientAPI
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:chroma:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:chroma:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:chroma:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:chroma:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:chroma:{VERSION}::"
@ -114,7 +114,7 @@ class ChromaIndex(EmbeddingIndex):
raise NotImplementedError("Hybrid search is not supported in Chroma")
class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(
self,
config: RemoteChromaVectorIOConfig | InlineChromaVectorIOConfig,
@ -127,11 +127,11 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.inference_api = inference_api
self.client = None
self.cache = {}
self.vector_db_store = None
self.vector_store_table = None
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
self.vector_db_store = self.kvstore
self.vector_store_table = self.kvstore
if isinstance(self.config, RemoteChromaVectorIOConfig):
log.info(f"Connecting to Chroma server at: {self.config.url}")
@ -151,26 +151,26 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
collection = await maybe_await(
self.client.get_or_create_collection(
name=vector_db.identifier, metadata={"vector_db": vector_db.model_dump_json()}
name=vector_store.identifier, metadata={"vector_store": vector_store.model_dump_json()}
)
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db, ChromaIndex(self.client, collection), self.inference_api
self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_store, ChromaIndex(self.client, collection), self.inference_api
)
async def unregister_vector_db(self, vector_db_id: str) -> None:
if vector_db_id not in self.cache:
log.warning(f"Vector DB {vector_db_id} not found")
async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_store_id not in self.cache:
log.warning(f"Vector DB {vector_store_id} not found")
return
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma")
@ -179,30 +179,30 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma")
return await index.query_chunks(query, params)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise ValueError(f"Vector DB {vector_db_id} not found in Llama Stack")
collection = await maybe_await(self.client.get_collection(vector_db_id))
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
collection = await maybe_await(self.client.get_collection(vector_store_id))
if not collection:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma")
index = VectorDBWithIndex(vector_db, ChromaIndex(self.client, collection), self.inference_api)
self.cache[vector_db_id] = index
raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
index = VectorStoreWithIndex(vector_store, ChromaIndex(self.client, collection), self.inference_api)
self.cache[vector_store_id] = index
return index
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a Chroma vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")

View file

@ -14,10 +14,10 @@ from pymilvus import AnnSearchRequest, DataType, Function, FunctionType, MilvusC
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.milvus import MilvusVectorIOConfig as InlineMilvusVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
@ -26,7 +26,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_WEIGHTED,
ChunkForDeletion,
EmbeddingIndex,
VectorDBWithIndex,
VectorStoreWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
@ -35,7 +35,7 @@ from .config import MilvusVectorIOConfig as RemoteMilvusVectorIOConfig
logger = get_logger(name=__name__, category="vector_io::milvus")
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:milvus:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:milvus:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:milvus:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:milvus:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION}::"
@ -261,7 +261,7 @@ class MilvusIndex(EmbeddingIndex):
raise
class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(
self,
config: RemoteMilvusVectorIOConfig | InlineMilvusVectorIOConfig,
@ -273,28 +273,28 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.cache = {}
self.client = None
self.inference_api = inference_api
self.vector_db_store = None
self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence)
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs:
vector_db = VectorDB.model_validate_json(vector_db_data)
index = VectorDBWithIndex(
vector_db,
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store,
index=MilvusIndex(
client=self.client,
collection_name=vector_db.identifier,
collection_name=vector_store.identifier,
consistency_level=self.config.consistency_level,
kvstore=self.kvstore,
),
inference_api=self.inference_api,
)
self.cache[vector_db.identifier] = index
self.cache[vector_store.identifier] = index
if isinstance(self.config, RemoteMilvusVectorIOConfig):
logger.info(f"Connecting to Milvus server at {self.config.uri}")
self.client = MilvusClient(**self.config.model_dump(exclude_none=True))
@ -311,45 +311,45 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
if isinstance(self.config, RemoteMilvusVectorIOConfig):
consistency_level = self.config.consistency_level
else:
consistency_level = "Strong"
index = VectorDBWithIndex(
vector_db=vector_db,
index=MilvusIndex(self.client, vector_db.identifier, consistency_level=consistency_level),
index = VectorStoreWithIndex(
vector_store=vector_store,
index=MilvusIndex(self.client, vector_store.identifier, consistency_level=consistency_level),
inference_api=self.inference_api,
)
self.cache[vector_db.identifier] = index
self.cache[vector_store.identifier] = index
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_db_store is None:
raise VectorStoreNotFoundError(vector_db_id)
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
index = VectorDBWithIndex(
vector_db=vector_db,
index=MilvusIndex(client=self.client, collection_name=vector_db.identifier, kvstore=self.kvstore),
index = VectorStoreWithIndex(
vector_store=vector_store,
index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[vector_store_id] = index
return index
async def unregister_vector_db(self, vector_db_id: str) -> None:
if vector_db_id in self.cache:
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_store_id in self.cache:
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
@ -358,14 +358,14 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete a chunk from a milvus vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise VectorStoreNotFoundError(store_id)

View file

@ -16,15 +16,15 @@ from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator, sanitize_collection_name
from .config import PGVectorVectorIOConfig
@ -32,7 +32,7 @@ from .config import PGVectorVectorIOConfig
log = get_logger(name=__name__, category="vector_io::pgvector")
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:pgvector:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:pgvector:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::"
@ -79,13 +79,13 @@ class PGVectorIndex(EmbeddingIndex):
def __init__(
self,
vector_db: VectorDB,
vector_store: VectorStore,
dimension: int,
conn: psycopg2.extensions.connection,
kvstore: KVStore | None = None,
distance_metric: str = "COSINE",
):
self.vector_db = vector_db
self.vector_store = vector_store
self.dimension = dimension
self.conn = conn
self.kvstore = kvstore
@ -97,9 +97,9 @@ class PGVectorIndex(EmbeddingIndex):
try:
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Sanitize the table name by replacing hyphens with underscores
# SQL doesn't allow hyphens in table names, and vector_db.identifier may contain hyphens
# SQL doesn't allow hyphens in table names, and vector_store.identifier may contain hyphens
# when created with patterns like "test-vector-db-{uuid4()}"
sanitized_identifier = sanitize_collection_name(self.vector_db.identifier)
sanitized_identifier = sanitize_collection_name(self.vector_store.identifier)
self.table_name = f"vs_{sanitized_identifier}"
cur.execute(
@ -122,8 +122,8 @@ class PGVectorIndex(EmbeddingIndex):
"""
)
except Exception as e:
log.exception(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}")
raise RuntimeError(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}") from e
log.exception(f"Error creating PGVectorIndex for vector_store: {self.vector_store.identifier}")
raise RuntimeError(f"Error creating PGVectorIndex for vector_store: {self.vector_store.identifier}") from e
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
@ -323,7 +323,7 @@ class PGVectorIndex(EmbeddingIndex):
)
class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(
self, config: PGVectorVectorIOConfig, inference_api: Inference, files_api: Files | None = None
) -> None:
@ -332,7 +332,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
self.inference_api = inference_api
self.conn = None
self.cache = {}
self.vector_db_store = None
self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None:
@ -375,59 +375,59 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
# Persist vector DB metadata in the KV store
assert self.kvstore is not None
# Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_db.identifier, vector_db)])
upsert_models(self.conn, [(vector_store.identifier, vector_store)])
# Create and cache the PGVector index table for the vector DB
pgvector_index = PGVectorIndex(
vector_db=vector_db, dimension=vector_db.embedding_dimension, conn=self.conn, kvstore=self.kvstore
vector_store=vector_store, dimension=vector_store.embedding_dimension, conn=self.conn, kvstore=self.kvstore
)
await pgvector_index.initialize()
index = VectorDBWithIndex(vector_db, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_db.identifier] = index
index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_store.identifier] = index
async def unregister_vector_db(self, vector_db_id: str) -> None:
async def unregister_vector_store(self, vector_store_id: str) -> None:
# Remove provider index and cache
if vector_db_id in self.cache:
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
if vector_store_id in self.cache:
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
# Delete vector DB metadata from KV store
assert self.kvstore is not None
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_db_id}")
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
await index.insert_chunks(chunks)
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
return await index.query_chunks(query, params)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_db_store is None:
raise VectorStoreNotFoundError(vector_db_id)
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn)
index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
await index.initialize()
self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api)
return self.cache[vector_db_id]
self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)
return self.cache[vector_store_id]
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete a chunk from a PostgreSQL vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise VectorStoreNotFoundError(store_id)

View file

@ -16,7 +16,6 @@ from qdrant_client.models import PointStruct
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
Chunk,
QueryChunksResponse,
@ -24,12 +23,13 @@ from llama_stack.apis.vector_io import (
VectorStoreChunkingStrategy,
VectorStoreFileObject,
)
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig
@ -38,7 +38,7 @@ CHUNK_ID_KEY = "_chunk_id"
# KV store prefixes for vector databases
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:qdrant:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:qdrant:{VERSION}::"
def convert_id(_id: str) -> str:
@ -145,7 +145,7 @@ class QdrantIndex(EmbeddingIndex):
await self.client.delete_collection(collection_name=self.collection_name)
class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(
self,
config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig,
@ -157,7 +157,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.client: AsyncQdrantClient = None
self.cache = {}
self.inference_api = inference_api
self.vector_db_store = None
self.vector_store_table = None
self._qdrant_lock = asyncio.Lock()
async def initialize(self) -> None:
@ -167,12 +167,14 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs:
vector_db = VectorDB.model_validate_json(vector_db_data)
index = VectorDBWithIndex(vector_db, QdrantIndex(self.client, vector_db.identifier), self.inference_api)
self.cache[vector_db.identifier] = index
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store, QdrantIndex(self.client, vector_store.identifier), self.inference_api
)
self.cache[vector_store.identifier] = index
self.openai_vector_stores = await self._load_openai_vector_stores()
async def shutdown(self) -> None:
@ -180,46 +182,48 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json())
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
index = VectorDBWithIndex(
vector_db=vector_db, index=QdrantIndex(self.client, vector_db.identifier), inference_api=self.inference_api
)
self.cache[vector_db.identifier] = index
async def unregister_vector_db(self, vector_db_id: str) -> None:
if vector_db_id in self.cache:
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
assert self.kvstore is not None
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}")
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
if self.vector_db_store is None:
raise ValueError(f"Vector DB not found {vector_db_id}")
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
index = VectorDBWithIndex(
vector_db=vector_db,
index=QdrantIndex(client=self.client, collection_name=vector_db.identifier),
index = VectorStoreWithIndex(
vector_store=vector_store,
index=QdrantIndex(self.client, vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[vector_store.identifier] = index
async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_store_id in self.cache:
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
assert self.kvstore is not None
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_store_id}")
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_store_id] = index
return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
@ -228,7 +232,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
@ -249,7 +253,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a Qdrant vector store."""
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")

View file

@ -16,11 +16,11 @@ from llama_stack.apis.common.content_types import InterleavedContent
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
@ -28,7 +28,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_RRF,
ChunkForDeletion,
EmbeddingIndex,
VectorDBWithIndex,
VectorStoreWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
@ -37,7 +37,7 @@ from .config import WeaviateVectorIOConfig
log = get_logger(name=__name__, category="vector_io::weaviate")
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:weaviate:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:weaviate:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::"
@ -257,14 +257,14 @@ class WeaviateIndex(EmbeddingIndex):
return QueryChunksResponse(chunks=chunks, scores=scores)
class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProviderData, VectorDBsProtocolPrivate):
class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProviderData, VectorStoresProtocolPrivate):
def __init__(self, config: WeaviateVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.inference_api = inference_api
self.client_cache = {}
self.cache = {}
self.vector_db_store = None
self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.WeaviateClient:
@ -300,11 +300,11 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
vector_store = VectorStore.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db, index=idx, inference_api=self.inference_api
idx = WeaviateIndex(client=client, collection_name=vector_store.identifier, kvstore=self.kvstore)
self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_store=vector_store, index=idx, inference_api=self.inference_api
)
# Load OpenAI vector stores metadata into cache
@ -316,9 +316,9 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True)
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
# Create collection if it doesn't exist
if not client.collections.exists(sanitized_collection_name):
client.collections.create(
@ -329,45 +329,45 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
],
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db, WeaviateIndex(client=client, collection_name=sanitized_collection_name), self.inference_api
self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_store, WeaviateIndex(client=client, collection_name=sanitized_collection_name), self.inference_api
)
async def unregister_vector_db(self, vector_db_id: str) -> None:
async def unregister_vector_store(self, vector_store_id: str) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
if vector_db_id not in self.cache or client.collections.exists(sanitized_collection_name) is False:
sanitized_collection_name = sanitize_collection_name(vector_store_id, weaviate_format=True)
if vector_store_id not in self.cache or client.collections.exists(sanitized_collection_name) is False:
return
client.collections.delete(sanitized_collection_name)
await self.cache[vector_db_id].index.delete()
del self.cache[vector_db_id]
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_db_store is None:
raise VectorStoreNotFoundError(vector_db_id)
if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True)
sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
if not client.collections.exists(sanitized_collection_name):
raise ValueError(f"Collection with name `{sanitized_collection_name}` not found")
index = VectorDBWithIndex(
vector_db=vector_db,
index=WeaviateIndex(client=client, collection_name=vector_db.identifier),
index = VectorStoreWithIndex(
vector_store=vector_store,
index=WeaviateIndex(client=client, collection_name=vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[vector_store_id] = index
return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
@ -376,14 +376,14 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
index = await self._get_and_cache_vector_db_index(store_id)
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")

View file

@ -17,7 +17,6 @@ from pydantic import TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files, OpenAIFileObject
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import (
Chunk,
OpenAICreateVectorStoreFileBatchRequestWithExtraBody,
@ -43,6 +42,7 @@ from llama_stack.apis.vector_io import (
VectorStoreSearchResponse,
VectorStoreSearchResponsePage,
)
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.core.id_generation import generate_object_id
from llama_stack.log import get_logger
from llama_stack.providers.utils.kvstore.api import KVStore
@ -63,7 +63,7 @@ MAX_CONCURRENT_FILES_PER_BATCH = 3 # Maximum concurrent file processing within
FILE_BATCH_CHUNK_SIZE = 10 # Process files in chunks of this size
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
VECTOR_DBS_PREFIX = f"vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::"
@ -321,12 +321,12 @@ class OpenAIVectorStoreMixin(ABC):
pass
@abstractmethod
async def register_vector_db(self, vector_db: VectorDB) -> None:
async def register_vector_store(self, vector_store: VectorStore) -> None:
"""Register a vector database (provider-specific implementation)."""
pass
@abstractmethod
async def unregister_vector_db(self, vector_db_id: str) -> None:
async def unregister_vector_store(self, vector_store_id: str) -> None:
"""Unregister a vector database (provider-specific implementation)."""
pass
@ -358,7 +358,7 @@ class OpenAIVectorStoreMixin(ABC):
extra_body = params.model_extra or {}
metadata = params.metadata or {}
provider_vector_db_id = extra_body.get("provider_vector_db_id")
provider_vector_store_id = extra_body.get("provider_vector_store_id")
# Use embedding info from metadata if available, otherwise from extra_body
if metadata.get("embedding_model"):
@ -389,8 +389,8 @@ class OpenAIVectorStoreMixin(ABC):
# use provider_id set by router; fallback to provider's own ID when used directly via --stack-config
provider_id = extra_body.get("provider_id") or getattr(self, "__provider_id__", None)
# Derive the canonical vector_db_id (allow override, else generate)
vector_db_id = provider_vector_db_id or generate_object_id("vector_store", lambda: f"vs_{uuid.uuid4()}")
# Derive the canonical vector_store_id (allow override, else generate)
vector_store_id = provider_vector_store_id or generate_object_id("vector_store", lambda: f"vs_{uuid.uuid4()}")
if embedding_model is None:
raise ValueError("embedding_model is required")
@ -398,19 +398,20 @@ class OpenAIVectorStoreMixin(ABC):
if embedding_dimension is None:
raise ValueError("Embedding dimension is required")
# Register the VectorDB backing this vector store
# Register the VectorStore backing this vector store
if provider_id is None:
raise ValueError("Provider ID is required but was not provided")
vector_db = VectorDB(
identifier=vector_db_id,
# call to the provider to create any index, etc.
vector_store = VectorStore(
identifier=vector_store_id,
embedding_dimension=embedding_dimension,
embedding_model=embedding_model,
provider_id=provider_id,
provider_resource_id=vector_db_id,
vector_db_name=params.name,
provider_resource_id=vector_store_id,
vector_store_name=params.name,
)
await self.register_vector_db(vector_db)
await self.register_vector_store(vector_store)
# Create OpenAI vector store metadata
status = "completed"
@ -424,7 +425,7 @@ class OpenAIVectorStoreMixin(ABC):
total=0,
)
store_info: dict[str, Any] = {
"id": vector_db_id,
"id": vector_store_id,
"object": "vector_store",
"created_at": created_at,
"name": params.name,
@ -441,23 +442,23 @@ class OpenAIVectorStoreMixin(ABC):
# Add provider information to metadata if provided
if provider_id:
metadata["provider_id"] = provider_id
if provider_vector_db_id:
metadata["provider_vector_db_id"] = provider_vector_db_id
if provider_vector_store_id:
metadata["provider_vector_store_id"] = provider_vector_store_id
store_info["metadata"] = metadata
# Save to persistent storage (provider-specific)
await self._save_openai_vector_store(vector_db_id, store_info)
await self._save_openai_vector_store(vector_store_id, store_info)
# Store in memory cache
self.openai_vector_stores[vector_db_id] = store_info
self.openai_vector_stores[vector_store_id] = store_info
# Now that our vector store is created, attach any files that were provided
file_ids = params.file_ids or []
tasks = [self.openai_attach_file_to_vector_store(vector_db_id, file_id) for file_id in file_ids]
tasks = [self.openai_attach_file_to_vector_store(vector_store_id, file_id) for file_id in file_ids]
await asyncio.gather(*tasks)
# Get the updated store info and return it
store_info = self.openai_vector_stores[vector_db_id]
store_info = self.openai_vector_stores[vector_store_id]
return VectorStoreObject.model_validate(store_info)
async def openai_list_vector_stores(
@ -567,7 +568,7 @@ class OpenAIVectorStoreMixin(ABC):
# Also delete the underlying vector DB
try:
await self.unregister_vector_db(vector_store_id)
await self.unregister_vector_store(vector_store_id)
except Exception as e:
logger.warning(f"Failed to delete underlying vector DB {vector_store_id}: {e}")

View file

@ -23,8 +23,8 @@ from llama_stack.apis.common.content_types import (
)
from llama_stack.apis.inference import OpenAIEmbeddingsRequestWithExtraBody
from llama_stack.apis.tools import RAGDocument
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, ChunkMetadata, QueryChunksResponse
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.models.llama.llama3.tokenizer import Tokenizer
from llama_stack.providers.datatypes import Api
@ -187,7 +187,7 @@ def make_overlapped_chunks(
updated_timestamp=int(time.time()),
chunk_window=chunk_window,
chunk_tokenizer=default_tokenizer,
chunk_embedding_model=None, # This will be set in `VectorDBWithIndex.insert_chunks`
chunk_embedding_model=None, # This will be set in `VectorStoreWithIndex.insert_chunks`
content_token_count=len(toks),
metadata_token_count=len(metadata_tokens),
)
@ -255,8 +255,8 @@ class EmbeddingIndex(ABC):
@dataclass
class VectorDBWithIndex:
vector_db: VectorDB
class VectorStoreWithIndex:
vector_store: VectorStore
index: EmbeddingIndex
inference_api: Api.inference
@ -269,14 +269,14 @@ class VectorDBWithIndex:
if c.embedding is None:
chunks_to_embed.append(c)
if c.chunk_metadata:
c.chunk_metadata.chunk_embedding_model = self.vector_db.embedding_model
c.chunk_metadata.chunk_embedding_dimension = self.vector_db.embedding_dimension
c.chunk_metadata.chunk_embedding_model = self.vector_store.embedding_model
c.chunk_metadata.chunk_embedding_dimension = self.vector_store.embedding_dimension
else:
_validate_embedding(c.embedding, i, self.vector_db.embedding_dimension)
_validate_embedding(c.embedding, i, self.vector_store.embedding_dimension)
if chunks_to_embed:
params = OpenAIEmbeddingsRequestWithExtraBody(
model=self.vector_db.embedding_model,
model=self.vector_store.embedding_model,
input=[c.content for c in chunks_to_embed],
)
resp = await self.inference_api.openai_embeddings(params)
@ -319,7 +319,7 @@ class VectorDBWithIndex:
return await self.index.query_keyword(query_string, k, score_threshold)
params = OpenAIEmbeddingsRequestWithExtraBody(
model=self.vector_db.embedding_model,
model=self.vector_store.embedding_model,
input=[query_string],
)
embeddings_response = await self.inference_api.openai_embeddings(params)