2: rename provider vector_db references to vector_store

This commit is contained in:
Ashwin Bharambe 2025-10-20 19:19:38 -07:00
parent 375650c6b3
commit 18ff28b6f0
9 changed files with 275 additions and 274 deletions

View file

@ -140,6 +140,7 @@ class VectorStoreFileCounts(BaseModel):
total: int total: int
# TODO: rename this as OpenAIVectorStore
@json_schema_type @json_schema_type
class VectorStoreObject(BaseModel): class VectorStoreObject(BaseModel):
"""OpenAI Vector Store object. """OpenAI Vector Store object.

View file

@ -17,7 +17,7 @@ from llama_stack.apis.models import Model
from llama_stack.apis.scoring_functions import ScoringFn from llama_stack.apis.scoring_functions import ScoringFn
from llama_stack.apis.shields import Shield from llama_stack.apis.shields import Shield
from llama_stack.apis.tools import ToolGroup from llama_stack.apis.tools import ToolGroup
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_dbs import VectorStore
from llama_stack.schema_utils import json_schema_type from llama_stack.schema_utils import json_schema_type
@ -68,10 +68,10 @@ class ShieldsProtocolPrivate(Protocol):
async def unregister_shield(self, identifier: str) -> None: ... async def unregister_shield(self, identifier: str) -> None: ...
class VectorDBsProtocolPrivate(Protocol): class VectorStoresProtocolPrivate(Protocol):
async def register_vector_db(self, vector_db: VectorDB) -> None: ... async def register_vector_store(self, vector_store: VectorStore) -> None: ...
async def unregister_vector_db(self, vector_db_id: str) -> None: ... async def unregister_vector_store(self, vector_store_id: str) -> None: ...
class DatasetsProtocolPrivate(Protocol): class DatasetsProtocolPrivate(Protocol):

View file

@ -17,21 +17,21 @@ from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, VectorDBsProtocolPrivate from llama_stack.providers.datatypes import HealthResponse, HealthStatus, VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import FaissVectorIOConfig from .config import FaissVectorIOConfig
logger = get_logger(name=__name__, category="vector_io") logger = get_logger(name=__name__, category="vector_io")
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:{VERSION}::"
FAISS_INDEX_PREFIX = f"faiss_index:{VERSION}::" FAISS_INDEX_PREFIX = f"faiss_index:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
@ -176,28 +176,28 @@ class FaissIndex(EmbeddingIndex):
) )
class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None: def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None) super().__init__(files_api=files_api, kvstore=None)
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.cache: dict[str, VectorDBWithIndex] = {} self.cache: dict[str, VectorStoreWithIndex] = {}
async def initialize(self) -> None: async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence) self.kvstore = await kvstore_impl(self.config.persistence)
# Load existing banks from kvstore # Load existing banks from kvstore
start_key = VECTOR_DBS_PREFIX start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff" end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs: for vector_store_data in stored_vector_stores:
vector_db = VectorDB.model_validate_json(vector_db_data) vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db, vector_store,
await FaissIndex.create(vector_db.embedding_dimension, self.kvstore, vector_db.identifier), await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
self.inference_api, self.inference_api,
) )
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
# Load existing OpenAI vector stores into the in-memory cache # Load existing OpenAI vector stores into the in-memory cache
await self.initialize_openai_vector_stores() await self.initialize_openai_vector_stores()
@ -222,46 +222,46 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr
except Exception as e: except Exception as e:
return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}") return HealthResponse(status=HealthStatus.ERROR, message=f"Health check failed: {str(e)}")
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}" key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json()) await self.kvstore.set(key=key, value=vector_store.model_dump_json())
# Store in cache # Store in cache
self.cache[vector_db.identifier] = VectorDBWithIndex( self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=await FaissIndex.create(vector_db.embedding_dimension, self.kvstore, vector_db.identifier), index=await FaissIndex.create(vector_store.embedding_dimension, self.kvstore, vector_store.identifier),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
async def list_vector_dbs(self) -> list[VectorDB]: async def list_vector_stores(self) -> list[VectorStore]:
return [i.vector_db for i in self.cache.values()] return [i.vector_store for i in self.cache.values()]
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
assert self.kvstore is not None assert self.kvstore is not None
if vector_db_id not in self.cache: if vector_store_id not in self.cache:
logger.warning(f"Vector DB {vector_db_id} not found") logger.warning(f"Vector DB {vector_store_id} not found")
return return
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}") await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = self.cache.get(vector_db_id) index = self.cache.get(vector_store_id)
if index is None: if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found. found: {self.cache.keys()}") raise ValueError(f"Vector DB {vector_store_id} not found. found: {self.cache.keys()}")
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = self.cache.get(vector_db_id) index = self.cache.get(vector_store_id)
if index is None: if index is None:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)

View file

@ -17,10 +17,10 @@ from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference from llama_stack.apis.inference import Inference
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
@ -28,7 +28,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_RRF, RERANKER_TYPE_RRF,
ChunkForDeletion, ChunkForDeletion,
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorStoreWithIndex,
) )
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator
@ -41,7 +41,7 @@ HYBRID_SEARCH = "hybrid"
SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH, HYBRID_SEARCH} SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH, HYBRID_SEARCH}
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:sqlite_vec:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:sqlite_vec:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:sqlite_vec:{VERSION}::" VECTOR_INDEX_PREFIX = f"vector_index:sqlite_vec:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:sqlite_vec:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:sqlite_vec:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:sqlite_vec:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:sqlite_vec:{VERSION}::"
@ -374,32 +374,32 @@ class SQLiteVecIndex(EmbeddingIndex):
await asyncio.to_thread(_delete_chunks) await asyncio.to_thread(_delete_chunks)
class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
""" """
A VectorIO implementation using SQLite + sqlite_vec. A VectorIO implementation using SQLite + sqlite_vec.
This class handles vector database registration (with metadata stored in a table named `vector_dbs`) This class handles vector database registration (with metadata stored in a table named `vector_stores`)
and creates a cache of VectorDBWithIndex instances (each wrapping a SQLiteVecIndex). and creates a cache of VectorStoreWithIndex instances (each wrapping a SQLiteVecIndex).
""" """
def __init__(self, config, inference_api: Inference, files_api: Files | None) -> None: def __init__(self, config, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None) super().__init__(files_api=files_api, kvstore=None)
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.cache: dict[str, VectorDBWithIndex] = {} self.cache: dict[str, VectorStoreWithIndex] = {}
self.vector_db_store = None self.vector_store_table = None
async def initialize(self) -> None: async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence) self.kvstore = await kvstore_impl(self.config.persistence)
start_key = VECTOR_DBS_PREFIX start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff" end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for db_json in stored_vector_dbs: for db_json in stored_vector_stores:
vector_db = VectorDB.model_validate_json(db_json) vector_store = VectorStore.model_validate_json(db_json)
index = await SQLiteVecIndex.create( index = await SQLiteVecIndex.create(
vector_db.embedding_dimension, self.config.db_path, vector_db.identifier vector_store.embedding_dimension, self.config.db_path, vector_store.identifier
) )
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_store.identifier] = VectorStoreWithIndex(vector_store, index, self.inference_api)
# Load existing OpenAI vector stores into the in-memory cache # Load existing OpenAI vector stores into the in-memory cache
await self.initialize_openai_vector_stores() await self.initialize_openai_vector_stores()
@ -408,63 +408,63 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def list_vector_dbs(self) -> list[VectorDB]: async def list_vector_stores(self) -> list[VectorStore]:
return [v.vector_db for v in self.cache.values()] return [v.vector_store for v in self.cache.values()]
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
index = await SQLiteVecIndex.create(vector_db.embedding_dimension, self.config.db_path, vector_db.identifier) index = await SQLiteVecIndex.create(vector_store.embedding_dimension, self.config.db_path, vector_store.identifier)
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_store.identifier] = VectorStoreWithIndex(vector_store, index, self.inference_api)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
if self.vector_db_store is None: if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
vector_db = self.vector_db_store.get_vector_db(vector_db_id) vector_store = self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=SQLiteVecIndex( index=SQLiteVecIndex(
dimension=vector_db.embedding_dimension, dimension=vector_store.embedding_dimension,
db_path=self.config.db_path, db_path=self.config.db_path,
bank_id=vector_db.identifier, bank_id=vector_store.identifier,
kvstore=self.kvstore, kvstore=self.kvstore,
), ),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db_id] = index self.cache[vector_store_id] = index
return index return index
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_db_id not in self.cache: if vector_store_id not in self.cache:
logger.warning(f"Vector DB {vector_db_id} not found") logger.warning(f"Vector DB {vector_store_id} not found")
return return
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
# The VectorDBWithIndex helper is expected to compute embeddings via the inference_api # The VectorStoreWithIndex helper is expected to compute embeddings via the inference_api
# and then call our index's add_chunks. # and then call our index's add_chunks.
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: Any, params: dict[str, Any] | None = None self, vector_store_id: str, query: Any, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a sqlite_vec index.""" """Delete chunks from a sqlite_vec index."""
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise VectorStoreNotFoundError(store_id) raise VectorStoreNotFoundError(store_id)

View file

@ -13,15 +13,15 @@ from numpy.typing import NDArray
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.chroma import ChromaVectorIOConfig as InlineChromaVectorIOConfig from llama_stack.providers.inline.vector_io.chroma import ChromaVectorIOConfig as InlineChromaVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import ChromaVectorIOConfig as RemoteChromaVectorIOConfig from .config import ChromaVectorIOConfig as RemoteChromaVectorIOConfig
@ -30,7 +30,7 @@ log = get_logger(name=__name__, category="vector_io::chroma")
ChromaClientType = chromadb.api.AsyncClientAPI | chromadb.api.ClientAPI ChromaClientType = chromadb.api.AsyncClientAPI | chromadb.api.ClientAPI
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:chroma:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:chroma:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:chroma:{VERSION}::" VECTOR_INDEX_PREFIX = f"vector_index:chroma:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:chroma:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:chroma:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:chroma:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:chroma:{VERSION}::"
@ -114,7 +114,7 @@ class ChromaIndex(EmbeddingIndex):
raise NotImplementedError("Hybrid search is not supported in Chroma") raise NotImplementedError("Hybrid search is not supported in Chroma")
class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__( def __init__(
self, self,
config: RemoteChromaVectorIOConfig | InlineChromaVectorIOConfig, config: RemoteChromaVectorIOConfig | InlineChromaVectorIOConfig,
@ -127,11 +127,11 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.inference_api = inference_api self.inference_api = inference_api
self.client = None self.client = None
self.cache = {} self.cache = {}
self.vector_db_store = None self.vector_store_table = None
async def initialize(self) -> None: async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence) self.kvstore = await kvstore_impl(self.config.persistence)
self.vector_db_store = self.kvstore self.vector_store_table = self.kvstore
if isinstance(self.config, RemoteChromaVectorIOConfig): if isinstance(self.config, RemoteChromaVectorIOConfig):
log.info(f"Connecting to Chroma server at: {self.config.url}") log.info(f"Connecting to Chroma server at: {self.config.url}")
@ -151,58 +151,58 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
collection = await maybe_await( collection = await maybe_await(
self.client.get_or_create_collection( self.client.get_or_create_collection(
name=vector_db.identifier, metadata={"vector_db": vector_db.model_dump_json()} name=vector_store.identifier, metadata={"vector_store": vector_store.model_dump_json()}
) )
) )
self.cache[vector_db.identifier] = VectorDBWithIndex( self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_db, ChromaIndex(self.client, collection), self.inference_api vector_store, ChromaIndex(self.client, collection), self.inference_api
) )
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_db_id not in self.cache: if vector_store_id not in self.cache:
log.warning(f"Vector DB {vector_db_id} not found") log.warning(f"Vector DB {vector_store_id} not found")
return return
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if index is None: if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma") raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if index is None: if index is None:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma") raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
vector_db = await self.vector_db_store.get_vector_db(vector_db_id) vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise ValueError(f"Vector DB {vector_db_id} not found in Llama Stack") raise ValueError(f"Vector DB {vector_store_id} not found in Llama Stack")
collection = await maybe_await(self.client.get_collection(vector_db_id)) collection = await maybe_await(self.client.get_collection(vector_store_id))
if not collection: if not collection:
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma") raise ValueError(f"Vector DB {vector_store_id} not found in Chroma")
index = VectorDBWithIndex(vector_db, ChromaIndex(self.client, collection), self.inference_api) index = VectorStoreWithIndex(vector_store, ChromaIndex(self.client, collection), self.inference_api)
self.cache[vector_db_id] = index self.cache[vector_store_id] = index
return index return index
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a Chroma vector store.""" """Delete chunks from a Chroma vector store."""
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise ValueError(f"Vector DB {store_id} not found") raise ValueError(f"Vector DB {store_id} not found")

View file

@ -14,10 +14,10 @@ from pymilvus import AnnSearchRequest, DataType, Function, FunctionType, MilvusC
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.milvus import MilvusVectorIOConfig as InlineMilvusVectorIOConfig from llama_stack.providers.inline.vector_io.milvus import MilvusVectorIOConfig as InlineMilvusVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
@ -26,7 +26,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_WEIGHTED, RERANKER_TYPE_WEIGHTED,
ChunkForDeletion, ChunkForDeletion,
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorStoreWithIndex,
) )
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
@ -35,7 +35,7 @@ from .config import MilvusVectorIOConfig as RemoteMilvusVectorIOConfig
logger = get_logger(name=__name__, category="vector_io::milvus") logger = get_logger(name=__name__, category="vector_io::milvus")
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:milvus:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:milvus:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:milvus:{VERSION}::" VECTOR_INDEX_PREFIX = f"vector_index:milvus:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:milvus:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:milvus:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION}::"
@ -261,7 +261,7 @@ class MilvusIndex(EmbeddingIndex):
raise raise
class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__( def __init__(
self, self,
config: RemoteMilvusVectorIOConfig | InlineMilvusVectorIOConfig, config: RemoteMilvusVectorIOConfig | InlineMilvusVectorIOConfig,
@ -273,28 +273,28 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.cache = {} self.cache = {}
self.client = None self.client = None
self.inference_api = inference_api self.inference_api = inference_api
self.vector_db_store = None self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata" self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None: async def initialize(self) -> None:
self.kvstore = await kvstore_impl(self.config.persistence) self.kvstore = await kvstore_impl(self.config.persistence)
start_key = VECTOR_DBS_PREFIX start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff" end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs: for vector_store_data in stored_vector_stores:
vector_db = VectorDB.model_validate_json(vector_db_data) vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db, vector_store,
index=MilvusIndex( index=MilvusIndex(
client=self.client, client=self.client,
collection_name=vector_db.identifier, collection_name=vector_store.identifier,
consistency_level=self.config.consistency_level, consistency_level=self.config.consistency_level,
kvstore=self.kvstore, kvstore=self.kvstore,
), ),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
if isinstance(self.config, RemoteMilvusVectorIOConfig): if isinstance(self.config, RemoteMilvusVectorIOConfig):
logger.info(f"Connecting to Milvus server at {self.config.uri}") logger.info(f"Connecting to Milvus server at {self.config.uri}")
self.client = MilvusClient(**self.config.model_dump(exclude_none=True)) self.client = MilvusClient(**self.config.model_dump(exclude_none=True))
@ -311,61 +311,61 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
if isinstance(self.config, RemoteMilvusVectorIOConfig): if isinstance(self.config, RemoteMilvusVectorIOConfig):
consistency_level = self.config.consistency_level consistency_level = self.config.consistency_level
else: else:
consistency_level = "Strong" consistency_level = "Strong"
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=MilvusIndex(self.client, vector_db.identifier, consistency_level=consistency_level), index=MilvusIndex(self.client, vector_store.identifier, consistency_level=consistency_level),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
if self.vector_db_store is None: if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id) vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=MilvusIndex(client=self.client, collection_name=vector_db.identifier, kvstore=self.kvstore), index=MilvusIndex(client=self.client, collection_name=vector_store.identifier, kvstore=self.kvstore),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db_id] = index self.cache[vector_store_id] = index
return index return index
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete a chunk from a milvus vector store.""" """Delete a chunk from a milvus vector store."""
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise VectorStoreNotFoundError(store_id) raise VectorStoreNotFoundError(store_id)

View file

@ -16,15 +16,15 @@ from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator, sanitize_collection_name from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator, sanitize_collection_name
from .config import PGVectorVectorIOConfig from .config import PGVectorVectorIOConfig
@ -32,7 +32,7 @@ from .config import PGVectorVectorIOConfig
log = get_logger(name=__name__, category="vector_io::pgvector") log = get_logger(name=__name__, category="vector_io::pgvector")
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:pgvector:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:pgvector:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::" VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::"
@ -79,13 +79,13 @@ class PGVectorIndex(EmbeddingIndex):
def __init__( def __init__(
self, self,
vector_db: VectorDB, vector_store: VectorStore,
dimension: int, dimension: int,
conn: psycopg2.extensions.connection, conn: psycopg2.extensions.connection,
kvstore: KVStore | None = None, kvstore: KVStore | None = None,
distance_metric: str = "COSINE", distance_metric: str = "COSINE",
): ):
self.vector_db = vector_db self.vector_store = vector_store
self.dimension = dimension self.dimension = dimension
self.conn = conn self.conn = conn
self.kvstore = kvstore self.kvstore = kvstore
@ -97,9 +97,9 @@ class PGVectorIndex(EmbeddingIndex):
try: try:
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Sanitize the table name by replacing hyphens with underscores # Sanitize the table name by replacing hyphens with underscores
# SQL doesn't allow hyphens in table names, and vector_db.identifier may contain hyphens # SQL doesn't allow hyphens in table names, and vector_store.identifier may contain hyphens
# when created with patterns like "test-vector-db-{uuid4()}" # when created with patterns like "test-vector-db-{uuid4()}"
sanitized_identifier = sanitize_collection_name(self.vector_db.identifier) sanitized_identifier = sanitize_collection_name(self.vector_store.identifier)
self.table_name = f"vs_{sanitized_identifier}" self.table_name = f"vs_{sanitized_identifier}"
cur.execute( cur.execute(
@ -122,8 +122,8 @@ class PGVectorIndex(EmbeddingIndex):
""" """
) )
except Exception as e: except Exception as e:
log.exception(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}") log.exception(f"Error creating PGVectorIndex for vector_store: {self.vector_store.identifier}")
raise RuntimeError(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}") from e raise RuntimeError(f"Error creating PGVectorIndex for vector_store: {self.vector_store.identifier}") from e
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), ( assert len(chunks) == len(embeddings), (
@ -323,7 +323,7 @@ class PGVectorIndex(EmbeddingIndex):
) )
class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__( def __init__(
self, config: PGVectorVectorIOConfig, inference_api: Inference, files_api: Files | None = None self, config: PGVectorVectorIOConfig, inference_api: Inference, files_api: Files | None = None
) -> None: ) -> None:
@ -332,7 +332,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
self.inference_api = inference_api self.inference_api = inference_api
self.conn = None self.conn = None
self.cache = {} self.cache = {}
self.vector_db_store = None self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata" self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None: async def initialize(self) -> None:
@ -375,59 +375,59 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
# Persist vector DB metadata in the KV store # Persist vector DB metadata in the KV store
assert self.kvstore is not None assert self.kvstore is not None
# Upsert model metadata in Postgres # Upsert model metadata in Postgres
upsert_models(self.conn, [(vector_db.identifier, vector_db)]) upsert_models(self.conn, [(vector_store.identifier, vector_store)])
# Create and cache the PGVector index table for the vector DB # Create and cache the PGVector index table for the vector DB
pgvector_index = PGVectorIndex( pgvector_index = PGVectorIndex(
vector_db=vector_db, dimension=vector_db.embedding_dimension, conn=self.conn, kvstore=self.kvstore vector_store=vector_store, dimension=vector_store.embedding_dimension, conn=self.conn, kvstore=self.kvstore
) )
await pgvector_index.initialize() await pgvector_index.initialize()
index = VectorDBWithIndex(vector_db, index=pgvector_index, inference_api=self.inference_api) index = VectorStoreWithIndex(vector_store, index=pgvector_index, inference_api=self.inference_api)
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
# Remove provider index and cache # Remove provider index and cache
if vector_db_id in self.cache: if vector_store_id in self.cache:
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
# Delete vector DB metadata from KV store # Delete vector DB metadata from KV store
assert self.kvstore is not None assert self.kvstore is not None
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_db_id}") await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
if self.vector_db_store is None: if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id) vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn) index = PGVectorIndex(vector_store, vector_store.embedding_dimension, self.conn)
await index.initialize() await index.initialize()
self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api) self.cache[vector_store_id] = VectorStoreWithIndex(vector_store, index, self.inference_api)
return self.cache[vector_db_id] return self.cache[vector_store_id]
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete a chunk from a PostgreSQL vector store.""" """Delete a chunk from a PostgreSQL vector store."""
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise VectorStoreNotFoundError(store_id) raise VectorStoreNotFoundError(store_id)

View file

@ -16,7 +16,7 @@ from qdrant_client.models import PointStruct
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent from llama_stack.apis.inference import Inference, InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import ( from llama_stack.apis.vector_io import (
Chunk, Chunk,
QueryChunksResponse, QueryChunksResponse,
@ -25,11 +25,11 @@ from llama_stack.apis.vector_io import (
VectorStoreFileObject, VectorStoreFileObject,
) )
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig
@ -38,7 +38,7 @@ CHUNK_ID_KEY = "_chunk_id"
# KV store prefixes for vector databases # KV store prefixes for vector databases
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:qdrant:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:qdrant:{VERSION}::"
def convert_id(_id: str) -> str: def convert_id(_id: str) -> str:
@ -145,7 +145,7 @@ class QdrantIndex(EmbeddingIndex):
await self.client.delete_collection(collection_name=self.collection_name) await self.client.delete_collection(collection_name=self.collection_name)
class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__( def __init__(
self, self,
config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig, config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig,
@ -157,7 +157,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self.client: AsyncQdrantClient = None self.client: AsyncQdrantClient = None
self.cache = {} self.cache = {}
self.inference_api = inference_api self.inference_api = inference_api
self.vector_db_store = None self.vector_store_table = None
self._qdrant_lock = asyncio.Lock() self._qdrant_lock = asyncio.Lock()
async def initialize(self) -> None: async def initialize(self) -> None:
@ -167,12 +167,12 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
start_key = VECTOR_DBS_PREFIX start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff" end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_db_data in stored_vector_dbs: for vector_store_data in stored_vector_stores:
vector_db = VectorDB.model_validate_json(vector_db_data) vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorDBWithIndex(vector_db, QdrantIndex(self.client, vector_db.identifier), self.inference_api) index = VectorStoreWithIndex(vector_store, QdrantIndex(self.client, vector_store.identifier), self.inference_api)
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
self.openai_vector_stores = await self._load_openai_vector_stores() self.openai_vector_stores = await self._load_openai_vector_stores()
async def shutdown(self) -> None: async def shutdown(self) -> None:
@ -180,57 +180,57 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}" key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_db.model_dump_json()) await self.kvstore.set(key=key, value=vector_store.model_dump_json())
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, index=QdrantIndex(self.client, vector_db.identifier), inference_api=self.inference_api vector_store=vector_store, index=QdrantIndex(self.client, vector_store.identifier), inference_api=self.inference_api
) )
self.cache[vector_db.identifier] = index self.cache[vector_store.identifier] = index
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
assert self.kvstore is not None assert self.kvstore is not None
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}") await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
if self.vector_db_store is None: if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_db_id}") raise ValueError(f"Vector DB not found {vector_store_id}")
vector_db = await self.vector_db_store.get_vector_db(vector_db_id) vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=QdrantIndex(client=self.client, collection_name=vector_db.identifier), index=QdrantIndex(client=self.client, collection_name=vector_store.identifier),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db_id] = index self.cache[vector_store_id] = index
return index return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
@ -249,7 +249,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from a Qdrant vector store.""" """Delete chunks from a Qdrant vector store."""
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise ValueError(f"Vector DB {store_id} not found") raise ValueError(f"Vector DB {store_id} not found")

View file

@ -16,11 +16,11 @@ from llama_stack.apis.common.content_types import InterleavedContent
from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference from llama_stack.apis.inference import Inference
from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_stores import VectorStore
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
from llama_stack.core.request_headers import NeedsRequestProviderData from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.log import get_logger from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
@ -28,7 +28,7 @@ from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_RRF, RERANKER_TYPE_RRF,
ChunkForDeletion, ChunkForDeletion,
EmbeddingIndex, EmbeddingIndex,
VectorDBWithIndex, VectorStoreWithIndex,
) )
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
@ -37,7 +37,7 @@ from .config import WeaviateVectorIOConfig
log = get_logger(name=__name__, category="vector_io::weaviate") log = get_logger(name=__name__, category="vector_io::weaviate")
VERSION = "v3" VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:weaviate:{VERSION}::" VECTOR_DBS_PREFIX = f"vector_stores:weaviate:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::" VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::"
@ -257,14 +257,14 @@ class WeaviateIndex(EmbeddingIndex):
return QueryChunksResponse(chunks=chunks, scores=scores) return QueryChunksResponse(chunks=chunks, scores=scores)
class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProviderData, VectorDBsProtocolPrivate): class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProviderData, VectorStoresProtocolPrivate):
def __init__(self, config: WeaviateVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None: def __init__(self, config: WeaviateVectorIOConfig, inference_api: Inference, files_api: Files | None) -> None:
super().__init__(files_api=files_api, kvstore=None) super().__init__(files_api=files_api, kvstore=None)
self.config = config self.config = config
self.inference_api = inference_api self.inference_api = inference_api
self.client_cache = {} self.client_cache = {}
self.cache = {} self.cache = {}
self.vector_db_store = None self.vector_store_table = None
self.metadata_collection_name = "openai_vector_stores_metadata" self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.WeaviateClient: def _get_client(self) -> weaviate.WeaviateClient:
@ -300,11 +300,11 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
end_key = f"{VECTOR_DBS_PREFIX}\xff" end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key) stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored: for raw in stored:
vector_db = VectorDB.model_validate_json(raw) vector_store = VectorStore.model_validate_json(raw)
client = self._get_client() client = self._get_client()
idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore) idx = WeaviateIndex(client=client, collection_name=vector_store.identifier, kvstore=self.kvstore)
self.cache[vector_db.identifier] = VectorDBWithIndex( self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_db=vector_db, index=idx, inference_api=self.inference_api vector_store=vector_store, index=idx, inference_api=self.inference_api
) )
# Load OpenAI vector stores metadata into cache # Load OpenAI vector stores metadata into cache
@ -316,9 +316,9 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
# Clean up mixin resources (file batch tasks) # Clean up mixin resources (file batch tasks)
await super().shutdown() await super().shutdown()
async def register_vector_db(self, vector_db: VectorDB) -> None: async def register_vector_store(self, vector_store: VectorStore) -> None:
client = self._get_client() client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True) sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
# Create collection if it doesn't exist # Create collection if it doesn't exist
if not client.collections.exists(sanitized_collection_name): if not client.collections.exists(sanitized_collection_name):
client.collections.create( client.collections.create(
@ -329,61 +329,61 @@ class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, NeedsRequestProv
], ],
) )
self.cache[vector_db.identifier] = VectorDBWithIndex( self.cache[vector_store.identifier] = VectorStoreWithIndex(
vector_db, WeaviateIndex(client=client, collection_name=sanitized_collection_name), self.inference_api vector_store, WeaviateIndex(client=client, collection_name=sanitized_collection_name), self.inference_api
) )
async def unregister_vector_db(self, vector_db_id: str) -> None: async def unregister_vector_store(self, vector_store_id: str) -> None:
client = self._get_client() client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True) sanitized_collection_name = sanitize_collection_name(vector_store_id, weaviate_format=True)
if vector_db_id not in self.cache or client.collections.exists(sanitized_collection_name) is False: if vector_store_id not in self.cache or client.collections.exists(sanitized_collection_name) is False:
return return
client.collections.delete(sanitized_collection_name) client.collections.delete(sanitized_collection_name)
await self.cache[vector_db_id].index.delete() await self.cache[vector_store_id].index.delete()
del self.cache[vector_db_id] del self.cache[vector_store_id]
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None: async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_db_id in self.cache: if vector_store_id in self.cache:
return self.cache[vector_db_id] return self.cache[vector_store_id]
if self.vector_db_store is None: if self.vector_store_table is None:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id) vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_db: if not vector_store:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
client = self._get_client() client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True) sanitized_collection_name = sanitize_collection_name(vector_store.identifier, weaviate_format=True)
if not client.collections.exists(sanitized_collection_name): if not client.collections.exists(sanitized_collection_name):
raise ValueError(f"Collection with name `{sanitized_collection_name}` not found") raise ValueError(f"Collection with name `{sanitized_collection_name}` not found")
index = VectorDBWithIndex( index = VectorStoreWithIndex(
vector_db=vector_db, vector_store=vector_store,
index=WeaviateIndex(client=client, collection_name=vector_db.identifier), index=WeaviateIndex(client=client, collection_name=vector_store.identifier),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db_id] = index self.cache[vector_store_id] = index
return index return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: async def insert_chunks(self, vector_store_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
await index.insert_chunks(chunks) await index.insert_chunks(chunks)
async def query_chunks( async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None self, vector_store_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse: ) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id) index = await self._get_and_cache_vector_store_index(vector_store_id)
if not index: if not index:
raise VectorStoreNotFoundError(vector_db_id) raise VectorStoreNotFoundError(vector_store_id)
return await index.query_chunks(query, params) return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
index = await self._get_and_cache_vector_db_index(store_id) index = await self._get_and_cache_vector_store_index(store_id)
if not index: if not index:
raise ValueError(f"Vector DB {store_id} not found") raise ValueError(f"Vector DB {store_id} not found")