From 61bddfe70e3b48856068e6d8c5d6e26da52729c8 Mon Sep 17 00:00:00 2001 From: Varsha Prasad Narsing Date: Tue, 17 Jun 2025 16:38:02 -0700 Subject: [PATCH 1/4] feat: Add openAI compatible APIs to QDrant Signed-off-by: Varsha Prasad Narsing --- .../inline/vector_io/qdrant/__init__.py | 10 +- .../inline/vector_io/qdrant/config.py | 3 +- .../remote/vector_io/qdrant/config.py | 3 +- .../remote/vector_io/qdrant/qdrant.py | 93 +++++++++++++++++-- .../vector_io/test_openai_vector_stores.py | 2 +- tests/unit/providers/vector_io/test_qdrant.py | 2 +- 6 files changed, 100 insertions(+), 13 deletions(-) diff --git a/llama_stack/providers/inline/vector_io/qdrant/__init__.py b/llama_stack/providers/inline/vector_io/qdrant/__init__.py index ee33b3797..bc9014c68 100644 --- a/llama_stack/providers/inline/vector_io/qdrant/__init__.py +++ b/llama_stack/providers/inline/vector_io/qdrant/__init__.py @@ -4,14 +4,18 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -from llama_stack.providers.datatypes import Api, ProviderSpec +from typing import Any + +from llama_stack.providers.datatypes import Api from .config import QdrantVectorIOConfig -async def get_adapter_impl(config: QdrantVectorIOConfig, deps: dict[Api, ProviderSpec]): +async def get_provider_impl(config: QdrantVectorIOConfig, deps: dict[Api, Any]): from llama_stack.providers.remote.vector_io.qdrant.qdrant import QdrantVectorIOAdapter - impl = QdrantVectorIOAdapter(config, deps[Api.inference]) + assert isinstance(config, QdrantVectorIOConfig), f"Unexpected config type: {type(config)}" + files_api = deps.get(Api.files) + impl = QdrantVectorIOAdapter(config, deps[Api.inference], files_api) await impl.initialize() return impl diff --git a/llama_stack/providers/inline/vector_io/qdrant/config.py b/llama_stack/providers/inline/vector_io/qdrant/config.py index 7cc91d918..61d026984 100644 --- a/llama_stack/providers/inline/vector_io/qdrant/config.py +++ b/llama_stack/providers/inline/vector_io/qdrant/config.py @@ -5,7 +5,7 @@ # the root directory of this source tree. -from typing import Any +from typing import Any, Literal from pydantic import BaseModel @@ -15,6 +15,7 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class QdrantVectorIOConfig(BaseModel): path: str + distance_metric: Literal["COSINE", "DOT", "EUCLID", "MANHATTAN"] = "COSINE" @classmethod def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]: diff --git a/llama_stack/providers/remote/vector_io/qdrant/config.py b/llama_stack/providers/remote/vector_io/qdrant/config.py index 314d3f5f1..0d8e08663 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/config.py +++ b/llama_stack/providers/remote/vector_io/qdrant/config.py @@ -4,7 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -from typing import Any +from typing import Any, Literal from pydantic import BaseModel @@ -23,6 +23,7 @@ class QdrantVectorIOConfig(BaseModel): prefix: str | None = None timeout: int | None = None host: str | None = None + distance_metric: Literal["COSINE", "DOT", "EUCLID", "MANHATTAN"] = "COSINE" @classmethod def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 09ea08fa0..8787a4900 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -12,6 +12,7 @@ from numpy.typing import NDArray from qdrant_client import AsyncQdrantClient, models from qdrant_client.models import PointStruct +from llama_stack.apis.files import Files from llama_stack.apis.inference import InterleavedContent from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_io import ( @@ -31,6 +32,7 @@ from llama_stack.apis.vector_io import ( ) from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig +from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( EmbeddingIndex, VectorDBWithIndex, @@ -40,6 +42,7 @@ from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig log = logging.getLogger(__name__) CHUNK_ID_KEY = "_chunk_id" +OPENAI_VECTOR_STORES_METADATA_COLLECTION = "openai_vector_stores_metadata" def convert_id(_id: str) -> str: @@ -54,9 +57,10 @@ def convert_id(_id: str) -> str: class QdrantIndex(EmbeddingIndex): - def __init__(self, client: AsyncQdrantClient, collection_name: str): + def __init__(self, client: AsyncQdrantClient, collection_name: str, distance_metric: str = "COSINE"): self.client = client self.collection_name = collection_name + self.distance_metric = distance_metric async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): assert len(chunks) == len(embeddings), ( @@ -64,9 +68,12 @@ class QdrantIndex(EmbeddingIndex): ) if not await self.client.collection_exists(self.collection_name): + # Get distance metric, defaulting to COSINE + distance = getattr(models.Distance, self.distance_metric, models.Distance.COSINE) + await self.client.create_collection( self.collection_name, - vectors_config=models.VectorParams(size=len(embeddings[0]), distance=models.Distance.COSINE), + vectors_config=models.VectorParams(size=len(embeddings[0]), distance=distance), ) points = [] @@ -132,28 +139,100 @@ class QdrantIndex(EmbeddingIndex): await self.client.delete_collection(collection_name=self.collection_name) -class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): +class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): def __init__( - self, config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig, inference_api: Api.inference + self, + config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig, + inference_api: Api.inference, + files_api: Files | None, ) -> None: self.config = config self.client: AsyncQdrantClient = None self.cache = {} self.inference_api = inference_api + self.files_api = files_api + self.vector_db_store = None + self.openai_vector_stores: dict[str, dict[str, Any]] = {} async def initialize(self) -> None: self.client = AsyncQdrantClient(**self.config.model_dump(exclude_none=True)) + # Load existing OpenAI vector stores using the mixin method + self.openai_vector_stores = await self._load_openai_vector_stores() async def shutdown(self) -> None: await self.client.close() + # OpenAI Vector Store Mixin abstract method implementations + async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: + """Save vector store metadata to Qdrant collection metadata.""" + # Store metadata in a special collection for vector store metadata + metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION + + # Create metadata collection if it doesn't exist + if not await self.client.collection_exists(metadata_collection): + # Get distance metric from config, defaulting to COSINE for backward compatibility + distance_metric = getattr(self.config, "distance_metric", "COSINE") + distance = getattr(models.Distance, distance_metric, models.Distance.COSINE) + + await self.client.create_collection( + collection_name=metadata_collection, + vectors_config=models.VectorParams(size=1, distance=distance), + ) + + # Store metadata as a point with dummy vector + await self.client.upsert( + collection_name=metadata_collection, + points=[ + models.PointStruct( + id=convert_id(store_id), + vector=[0.0], # Dummy vector + payload={"metadata": store_info}, + ) + ], + ) + + async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: + """Load all vector store metadata from Qdrant.""" + metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION + + if not await self.client.collection_exists(metadata_collection): + return {} + + # Get all points from metadata collection + points = await self.client.scroll( + collection_name=metadata_collection, + limit=1000, # Reasonable limit for metadata + with_payload=True, + ) + + stores = {} + for point in points[0]: # points[0] contains the actual points + if point.payload and "metadata" in point.payload: + store_info = point.payload["metadata"] + stores[store_info["id"]] = store_info + + return stores + + async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: + """Update vector store metadata in Qdrant.""" + await self._save_openai_vector_store(store_id, store_info) + + async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: + """Delete vector store metadata from Qdrant.""" + metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION + + if await self.client.collection_exists(metadata_collection): + await self.client.delete( + collection_name=metadata_collection, points_selector=models.PointIdsList(points=[convert_id(store_id)]) + ) + async def register_vector_db( self, vector_db: VectorDB, ) -> None: index = VectorDBWithIndex( vector_db=vector_db, - index=QdrantIndex(self.client, vector_db.identifier), + index=QdrantIndex(self.client, vector_db.identifier, self.config.distance_metric), inference_api=self.inference_api, ) @@ -174,7 +253,9 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): index = VectorDBWithIndex( vector_db=vector_db, - index=QdrantIndex(client=self.client, collection_name=vector_db.identifier), + index=QdrantIndex( + client=self.client, collection_name=vector_db.identifier, distance_metric=self.config.distance_metric + ), inference_api=self.inference_api, ) self.cache[vector_db_id] = index diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 4c061f519..93ae27a8e 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"] for p in vector_io_providers: - if p.provider_type in ["inline::faiss", "inline::sqlite-vec"]: + if p.provider_type in ["inline::faiss", "inline::sqlite-vec", "inline::qdrant"]: return pytest.skip("OpenAI vector stores are not supported by any provider") diff --git a/tests/unit/providers/vector_io/test_qdrant.py b/tests/unit/providers/vector_io/test_qdrant.py index 6902c8850..87cd18ce3 100644 --- a/tests/unit/providers/vector_io/test_qdrant.py +++ b/tests/unit/providers/vector_io/test_qdrant.py @@ -70,7 +70,7 @@ def mock_api_service(sample_embeddings): @pytest_asyncio.fixture async def qdrant_adapter(qdrant_config, mock_vector_db_store, mock_api_service, loop) -> QdrantVectorIOAdapter: - adapter = QdrantVectorIOAdapter(config=qdrant_config, inference_api=mock_api_service) + adapter = QdrantVectorIOAdapter(config=qdrant_config, inference_api=mock_api_service, files_api=None) adapter.vector_db_store = mock_vector_db_store await adapter.initialize() yield adapter From 3d27b7054c158beb781cb7f4c1dbb971b089281f Mon Sep 17 00:00:00 2001 From: Varsha Prasad Narsing Date: Wed, 25 Jun 2025 16:59:29 -0700 Subject: [PATCH 2/4] feat: rebase and implement file API methods Signed-off-by: Varsha Prasad Narsing --- docs/_static/llama-stack-spec.html | 26 ++ docs/_static/llama-stack-spec.yaml | 13 + llama_stack/apis/vector_io/vector_io.py | 2 + .../providers/inline/vector_io/faiss/faiss.py | 3 +- .../inline/vector_io/qdrant/config.py | 3 +- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 12 +- .../remote/vector_io/chroma/chroma.py | 2 +- .../remote/vector_io/milvus/milvus.py | 2 +- .../remote/vector_io/pgvector/pgvector.py | 2 +- .../remote/vector_io/qdrant/config.py | 3 +- .../remote/vector_io/qdrant/qdrant.py | 255 +++++++++--------- .../remote/vector_io/weaviate/weaviate.py | 2 +- .../providers/utils/memory/vector_store.py | 11 +- 13 files changed, 195 insertions(+), 141 deletions(-) diff --git a/docs/_static/llama-stack-spec.html b/docs/_static/llama-stack-spec.html index f9e4bb38e..b49049a8a 100644 --- a/docs/_static/llama-stack-spec.html +++ b/docs/_static/llama-stack-spec.html @@ -11468,6 +11468,32 @@ "ttl_seconds": { "type": "integer", "description": "The time to live of the chunks." + }, + "params": { + "type": "object", + "additionalProperties": { + "oneOf": [ + { + "type": "null" + }, + { + "type": "boolean" + }, + { + "type": "number" + }, + { + "type": "string" + }, + { + "type": "array" + }, + { + "type": "object" + } + ] + }, + "description": "Optional parameters for the insertion operation, such as distance_metric for vector databases." } }, "additionalProperties": false, diff --git a/docs/_static/llama-stack-spec.yaml b/docs/_static/llama-stack-spec.yaml index 9175c97fc..ad76e5535 100644 --- a/docs/_static/llama-stack-spec.yaml +++ b/docs/_static/llama-stack-spec.yaml @@ -8095,6 +8095,19 @@ components: ttl_seconds: type: integer description: The time to live of the chunks. + params: + type: object + additionalProperties: + oneOf: + - type: 'null' + - type: boolean + - type: number + - type: string + - type: array + - type: object + description: >- + Optional parameters for the insertion operation, such as distance_metric + for vector databases. additionalProperties: false required: - vector_db_id diff --git a/llama_stack/apis/vector_io/vector_io.py b/llama_stack/apis/vector_io/vector_io.py index 2d4131315..a033cd90f 100644 --- a/llama_stack/apis/vector_io/vector_io.py +++ b/llama_stack/apis/vector_io/vector_io.py @@ -306,6 +306,7 @@ class VectorIO(Protocol): vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: """Insert chunks into a vector database. @@ -315,6 +316,7 @@ class VectorIO(Protocol): If `metadata` is provided, you configure how Llama Stack formats the chunk during generation. If `embedding` is not provided, it will be computed later. :param ttl_seconds: The time to live of the chunks. + :param params: Optional parameters for the insertion operation, such as distance_metric for vector databases. """ ... diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index 355750b25..98200e733 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -96,7 +96,7 @@ class FaissIndex(EmbeddingIndex): await self.kvstore.delete(f"{FAISS_INDEX_PREFIX}{self.bank_id}") - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): # Add dimension check embedding_dim = embeddings.shape[1] if len(embeddings.shape) > 1 else embeddings.shape[0] if embedding_dim != self.index.d: @@ -234,6 +234,7 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = self.cache.get(vector_db_id) if index is None: diff --git a/llama_stack/providers/inline/vector_io/qdrant/config.py b/llama_stack/providers/inline/vector_io/qdrant/config.py index 61d026984..7cc91d918 100644 --- a/llama_stack/providers/inline/vector_io/qdrant/config.py +++ b/llama_stack/providers/inline/vector_io/qdrant/config.py @@ -5,7 +5,7 @@ # the root directory of this source tree. -from typing import Any, Literal +from typing import Any from pydantic import BaseModel @@ -15,7 +15,6 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class QdrantVectorIOConfig(BaseModel): path: str - distance_metric: Literal["COSINE", "DOT", "EUCLID", "MANHATTAN"] = "COSINE" @classmethod def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]: diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 7e977635a..58bf8fa8c 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -178,7 +178,9 @@ class SQLiteVecIndex(EmbeddingIndex): await asyncio.to_thread(_drop_tables) - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, batch_size: int = 500): + async def add_chunks( + self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None, batch_size: int = 500 + ): """ Add new chunks along with their embeddings using batch inserts. For each chunk, we insert its JSON into the metadata table and then insert its @@ -729,7 +731,13 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc await asyncio.to_thread(_delete) - async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: + async def insert_chunks( + self, + vector_db_id: str, + chunks: list[Chunk], + ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, + ) -> None: if vector_db_id not in self.cache: raise ValueError(f"Vector DB {vector_db_id} not found. Found: {list(self.cache.keys())}") # The VectorDBWithIndex helper is expected to compute embeddings via the inference_api diff --git a/llama_stack/providers/remote/vector_io/chroma/chroma.py b/llama_stack/providers/remote/vector_io/chroma/chroma.py index 3bef39e9c..cc38553ff 100644 --- a/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -55,7 +55,7 @@ class ChromaIndex(EmbeddingIndex): self.client = client self.collection = collection - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): assert len(chunks) == len(embeddings), ( f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" ) diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index 182227a85..f697579fd 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -53,7 +53,7 @@ class MilvusIndex(EmbeddingIndex): if await asyncio.to_thread(self.client.has_collection, self.collection_name): await asyncio.to_thread(self.client.drop_collection, collection_name=self.collection_name) - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): assert len(chunks) == len(embeddings), ( f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" ) diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index c3cdef9b8..96f16146e 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -88,7 +88,7 @@ class PGVectorIndex(EmbeddingIndex): """ ) - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): assert len(chunks) == len(embeddings), ( f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" ) diff --git a/llama_stack/providers/remote/vector_io/qdrant/config.py b/llama_stack/providers/remote/vector_io/qdrant/config.py index 0d8e08663..314d3f5f1 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/config.py +++ b/llama_stack/providers/remote/vector_io/qdrant/config.py @@ -4,7 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -from typing import Any, Literal +from typing import Any from pydantic import BaseModel @@ -23,7 +23,6 @@ class QdrantVectorIOConfig(BaseModel): prefix: str | None = None timeout: int | None = None host: str | None = None - distance_metric: Literal["COSINE", "DOT", "EUCLID", "MANHATTAN"] = "COSINE" @classmethod def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 8787a4900..3a73c57cd 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -18,17 +18,7 @@ from llama_stack.apis.vector_dbs import VectorDB from llama_stack.apis.vector_io import ( Chunk, QueryChunksResponse, - SearchRankingOptions, VectorIO, - VectorStoreChunkingStrategy, - VectorStoreDeleteResponse, - VectorStoreFileContentsResponse, - VectorStoreFileObject, - VectorStoreFileStatus, - VectorStoreListFilesResponse, - VectorStoreListResponse, - VectorStoreObject, - VectorStoreSearchResponsePage, ) from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig @@ -57,24 +47,41 @@ def convert_id(_id: str) -> str: class QdrantIndex(EmbeddingIndex): - def __init__(self, client: AsyncQdrantClient, collection_name: str, distance_metric: str = "COSINE"): + def __init__(self, client: AsyncQdrantClient, collection_name: str): self.client = client self.collection_name = collection_name - self.distance_metric = distance_metric + self._distance_metric = None # Will be set when collection is created - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): assert len(chunks) == len(embeddings), ( f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" ) + # Extract distance_metric from metadata if provided, default to COSINE + distance_metric = "COSINE" # Default + if metadata is not None and "distance_metric" in metadata: + distance_metric = metadata["distance_metric"] + if not await self.client.collection_exists(self.collection_name): - # Get distance metric, defaulting to COSINE - distance = getattr(models.Distance, self.distance_metric, models.Distance.COSINE) + # Create collection with the specified distance metric + distance = getattr(models.Distance, distance_metric, models.Distance.COSINE) + self._distance_metric = distance_metric await self.client.create_collection( self.collection_name, vectors_config=models.VectorParams(size=len(embeddings[0]), distance=distance), ) + else: + # Collection already exists, warn if different distance metric was requested + if self._distance_metric is None: + # For now, assume COSINE as default since we can't easily extract it from collection info + self._distance_metric = "COSINE" + + if self._distance_metric != distance_metric: + log.warning( + f"Collection {self.collection_name} was created with distance metric '{self._distance_metric}', " + f"but '{distance_metric}' was requested. Using existing distance metric." + ) points = [] for _i, (chunk, embedding) in enumerate(zip(chunks, embeddings, strict=False)): @@ -90,6 +97,7 @@ class QdrantIndex(EmbeddingIndex): await self.client.upsert(collection_name=self.collection_name, points=points) async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: + # Distance metric is set at collection creation and cannot be changed results = ( await self.client.query_points( collection_name=self.collection_name, @@ -170,9 +178,8 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP # Create metadata collection if it doesn't exist if not await self.client.collection_exists(metadata_collection): - # Get distance metric from config, defaulting to COSINE for backward compatibility - distance_metric = getattr(self.config, "distance_metric", "COSINE") - distance = getattr(models.Distance, distance_metric, models.Distance.COSINE) + # Use default distance metric for metadata collection + distance = models.Distance.COSINE await self.client.create_collection( collection_name=metadata_collection, @@ -226,13 +233,101 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP collection_name=metadata_collection, points_selector=models.PointIdsList(points=[convert_id(store_id)]) ) + async def _save_openai_vector_store_file( + self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] + ) -> None: + """Save vector store file metadata to Qdrant collection metadata.""" + # Store file metadata in a special collection for vector store file metadata + file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" + + # Create file metadata collection if it doesn't exist + if not await self.client.collection_exists(file_metadata_collection): + distance = models.Distance.COSINE + await self.client.create_collection( + collection_name=file_metadata_collection, + vectors_config=models.VectorParams(size=1, distance=distance), + ) + + # Store file metadata as a point with dummy vector + file_key = f"{store_id}:{file_id}" + await self.client.upsert( + collection_name=file_metadata_collection, + points=[ + models.PointStruct( + id=convert_id(file_key), + vector=[0.0], # Dummy vector + payload={"file_info": file_info, "file_contents": file_contents}, + ) + ], + ) + + async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: + """Load vector store file metadata from Qdrant.""" + file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" + + if not await self.client.collection_exists(file_metadata_collection): + return {} + + file_key = f"{store_id}:{file_id}" + points = await self.client.retrieve( + collection_name=file_metadata_collection, + ids=[convert_id(file_key)], + with_payload=True, + ) + + if points and points[0].payload and "file_info" in points[0].payload: + return points[0].payload["file_info"] + return {} + + async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: + """Load vector store file contents from Qdrant.""" + file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" + + if not await self.client.collection_exists(file_metadata_collection): + return [] + + file_key = f"{store_id}:{file_id}" + points = await self.client.retrieve( + collection_name=file_metadata_collection, + ids=[convert_id(file_key)], + with_payload=True, + ) + + if points and points[0].payload and "file_contents" in points[0].payload: + return points[0].payload["file_contents"] + return [] + + async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: + """Update vector store file metadata in Qdrant.""" + file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" + + if not await self.client.collection_exists(file_metadata_collection): + return + + # Get existing file contents + existing_contents = await self._load_openai_vector_store_file_contents(store_id, file_id) + + # Update with new file info but keep existing contents + await self._save_openai_vector_store_file(store_id, file_id, file_info, existing_contents) + + async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: + """Delete vector store file metadata from Qdrant.""" + file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" + + if await self.client.collection_exists(file_metadata_collection): + file_key = f"{store_id}:{file_id}" + await self.client.delete( + collection_name=file_metadata_collection, + points_selector=models.PointIdsList(points=[convert_id(file_key)]), + ) + async def register_vector_db( self, vector_db: VectorDB, ) -> None: index = VectorDBWithIndex( vector_db=vector_db, - index=QdrantIndex(self.client, vector_db.identifier, self.config.distance_metric), + index=QdrantIndex(self.client, vector_db.identifier), inference_api=self.inference_api, ) @@ -253,9 +348,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP index = VectorDBWithIndex( vector_db=vector_db, - index=QdrantIndex( - client=self.client, collection_name=vector_db.identifier, distance_metric=self.config.distance_metric - ), + index=QdrantIndex(client=self.client, collection_name=vector_db.identifier), inference_api=self.inference_api, ) self.cache[vector_db_id] = index @@ -266,12 +359,23 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) if not index: raise ValueError(f"Vector DB {vector_db_id} not found") - await index.insert_chunks(chunks) + # Extract distance_metric from params if provided + distance_metric = None + if params is not None: + distance_metric = params.get("distance_metric") + + # Create metadata dict with distance_metric if provided + metadata = None + if distance_metric is not None: + metadata = {"distance_metric": distance_metric} + + await index.insert_chunks(chunks, metadata=metadata) async def query_chunks( self, @@ -284,108 +388,3 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP raise ValueError(f"Vector DB {vector_db_id} not found") return await index.query_chunks(query, params) - - async def openai_create_vector_store( - self, - name: str, - file_ids: list[str] | None = None, - expires_after: dict[str, Any] | None = None, - chunking_strategy: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - embedding_model: str | None = None, - embedding_dimension: int | None = 384, - provider_id: str | None = None, - provider_vector_db_id: str | None = None, - ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_list_vector_stores( - self, - limit: int | None = 20, - order: str | None = "desc", - after: str | None = None, - before: str | None = None, - ) -> VectorStoreListResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_retrieve_vector_store( - self, - vector_store_id: str, - ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_update_vector_store( - self, - vector_store_id: str, - name: str | None = None, - expires_after: dict[str, Any] | None = None, - metadata: dict[str, Any] | None = None, - ) -> VectorStoreObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_delete_vector_store( - self, - vector_store_id: str, - ) -> VectorStoreDeleteResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_search_vector_store( - self, - vector_store_id: str, - query: str | list[str], - filters: dict[str, Any] | None = None, - max_num_results: int | None = 10, - ranking_options: SearchRankingOptions | None = None, - rewrite_query: bool | None = False, - search_mode: str | None = "vector", - ) -> VectorStoreSearchResponsePage: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_attach_file_to_vector_store( - self, - vector_store_id: str, - file_id: str, - attributes: dict[str, Any] | None = None, - chunking_strategy: VectorStoreChunkingStrategy | None = None, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_list_files_in_vector_store( - self, - vector_store_id: str, - limit: int | None = 20, - order: str | None = "desc", - after: str | None = None, - before: str | None = None, - filter: VectorStoreFileStatus | None = None, - ) -> VectorStoreListFilesResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_retrieve_vector_store_file( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_retrieve_vector_store_file_contents( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileContentsResponse: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_update_vector_store_file( - self, - vector_store_id: str, - file_id: str, - attributes: dict[str, Any] | None = None, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") - - async def openai_delete_vector_store_file( - self, - vector_store_id: str, - file_id: str, - ) -> VectorStoreFileObject: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant") diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index c63dd70c6..5ecfce31d 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -33,7 +33,7 @@ class WeaviateIndex(EmbeddingIndex): self.client = client self.collection_name = collection_name - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): assert len(chunks) == len(embeddings), ( f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" ) diff --git a/llama_stack/providers/utils/memory/vector_store.py b/llama_stack/providers/utils/memory/vector_store.py index ab204a75a..225dda317 100644 --- a/llama_stack/providers/utils/memory/vector_store.py +++ b/llama_stack/providers/utils/memory/vector_store.py @@ -214,7 +214,7 @@ def _validate_embedding(embedding: NDArray, index: int, expected_dimension: int) class EmbeddingIndex(ABC): @abstractmethod - async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray, metadata: dict[str, Any] | None = None): raise NotImplementedError() @abstractmethod @@ -251,6 +251,7 @@ class VectorDBWithIndex: async def insert_chunks( self, chunks: list[Chunk], + distance_metric: str | None = None, ) -> None: chunks_to_embed = [] for i, c in enumerate(chunks): @@ -271,7 +272,13 @@ class VectorDBWithIndex: c.embedding = embedding embeddings = np.array([c.embedding for c in chunks], dtype=np.float32) - await self.index.add_chunks(chunks, embeddings) + + # Create metadata dict with distance_metric if provided + metadata = None + if distance_metric is not None: + metadata = {"distance_metric": distance_metric} + + await self.index.add_chunks(chunks, embeddings, metadata=metadata) async def query_chunks( self, From 655468bbafdae7606dfce7804f1d68e9643abb29 Mon Sep 17 00:00:00 2001 From: Varsha Prasad Narsing Date: Thu, 26 Jun 2025 12:04:38 -0700 Subject: [PATCH 3/4] fix: VectorDB with metadata parameter Signed-off-by: Varsha Prasad Narsing --- llama_stack/distribution/routers/vector_io.py | 5 ++++- llama_stack/providers/remote/vector_io/chroma/chroma.py | 1 + llama_stack/providers/remote/vector_io/milvus/milvus.py | 1 + .../providers/remote/vector_io/pgvector/pgvector.py | 1 + llama_stack/providers/remote/vector_io/qdrant/qdrant.py | 7 +------ .../providers/remote/vector_io/weaviate/weaviate.py | 1 + 6 files changed, 9 insertions(+), 7 deletions(-) diff --git a/llama_stack/distribution/routers/vector_io.py b/llama_stack/distribution/routers/vector_io.py index 4bd5952dc..f350f17e5 100644 --- a/llama_stack/distribution/routers/vector_io.py +++ b/llama_stack/distribution/routers/vector_io.py @@ -97,11 +97,14 @@ class VectorIORouter(VectorIO): vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: logger.debug( f"VectorIORouter.insert_chunks: {vector_db_id}, {len(chunks)} chunks, ttl_seconds={ttl_seconds}, chunk_ids={[chunk.metadata['document_id'] for chunk in chunks[:3]]}{' and more...' if len(chunks) > 3 else ''}", ) - return await self.routing_table.get_provider_impl(vector_db_id).insert_chunks(vector_db_id, chunks, ttl_seconds) + return await self.routing_table.get_provider_impl(vector_db_id).insert_chunks( + vector_db_id, chunks, ttl_seconds, params + ) async def query_chunks( self, diff --git a/llama_stack/providers/remote/vector_io/chroma/chroma.py b/llama_stack/providers/remote/vector_io/chroma/chroma.py index cc38553ff..db11903cb 100644 --- a/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -178,6 +178,7 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index f697579fd..168ca6aa3 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -183,6 +183,7 @@ class MilvusVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) if not index: diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index 96f16146e..afdd8fb8f 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -215,6 +215,7 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate): vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) await index.insert_chunks(chunks) diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 3a73c57cd..e00087d48 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -370,12 +370,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP if params is not None: distance_metric = params.get("distance_metric") - # Create metadata dict with distance_metric if provided - metadata = None - if distance_metric is not None: - metadata = {"distance_metric": distance_metric} - - await index.insert_chunks(chunks, metadata=metadata) + await index.insert_chunks(chunks, distance_metric=distance_metric) async def query_chunks( self, diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index 5ecfce31d..13d734322 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -188,6 +188,7 @@ class WeaviateVectorIOAdapter( vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None, + params: dict[str, Any] | None = None, ) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) if not index: From a879b6c12ecbc1e763f33cab62524e4013425ab7 Mon Sep 17 00:00:00 2001 From: Varsha Prasad Narsing Date: Thu, 26 Jun 2025 17:06:57 -0700 Subject: [PATCH 4/4] feat: use kv store for metadata Signed-off-by: Varsha Prasad Narsing --- .../inline/vector_io/qdrant/config.py | 9 + .../remote/vector_io/qdrant/config.py | 11 +- .../remote/vector_io/qdrant/qdrant.py | 202 +++++++----------- tests/unit/providers/vector_io/test_qdrant.py | 5 +- 4 files changed, 96 insertions(+), 131 deletions(-) diff --git a/llama_stack/providers/inline/vector_io/qdrant/config.py b/llama_stack/providers/inline/vector_io/qdrant/config.py index 7cc91d918..7b46cc3d9 100644 --- a/llama_stack/providers/inline/vector_io/qdrant/config.py +++ b/llama_stack/providers/inline/vector_io/qdrant/config.py @@ -9,15 +9,24 @@ from typing import Any from pydantic import BaseModel +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) from llama_stack.schema_utils import json_schema_type @json_schema_type class QdrantVectorIOConfig(BaseModel): path: str + kvstore: KVStoreConfig @classmethod def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]: return { "path": "${env.QDRANT_PATH:=~/.llama/" + __distro_dir__ + "}/" + "qdrant.db", + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="qdrant_store.db", + ), } diff --git a/llama_stack/providers/remote/vector_io/qdrant/config.py b/llama_stack/providers/remote/vector_io/qdrant/config.py index 314d3f5f1..efb30b036 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/config.py +++ b/llama_stack/providers/remote/vector_io/qdrant/config.py @@ -8,6 +8,10 @@ from typing import Any from pydantic import BaseModel +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) from llama_stack.schema_utils import json_schema_type @@ -23,9 +27,14 @@ class QdrantVectorIOConfig(BaseModel): prefix: str | None = None timeout: int | None = None host: str | None = None + kvstore: KVStoreConfig @classmethod - def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { "api_key": "${env.QDRANT_API_KEY}", + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="qdrant_store.db", + ), } diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index e00087d48..1c25de6d3 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -4,6 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import json import logging import uuid from typing import Any @@ -22,6 +23,7 @@ from llama_stack.apis.vector_io import ( ) from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig +from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( EmbeddingIndex, @@ -32,7 +34,13 @@ from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig log = logging.getLogger(__name__) CHUNK_ID_KEY = "_chunk_id" -OPENAI_VECTOR_STORES_METADATA_COLLECTION = "openai_vector_stores_metadata" + +# KV store prefixes for OpenAI vector stores +OPENAI_VECTOR_STORES_PREFIX = "openai_vector_stores:" +OPENAI_VECTOR_STORES_FILES_PREFIX = "openai_vector_stores_files:" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = "openai_vector_stores_files_contents:" + +VECTOR_DBS_PREFIX = "vector_dbs:" def convert_id(_id: str) -> str: @@ -160,11 +168,28 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.inference_api = inference_api self.files_api = files_api self.vector_db_store = None + self.kvstore: KVStore | None = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} async def initialize(self) -> None: self.client = AsyncQdrantClient(**self.config.model_dump(exclude_none=True)) - # Load existing OpenAI vector stores using the mixin method + self.kvstore = await kvstore_impl(self.config.kvstore) + + # Load existing vector DBs from kvstore + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) + + for vector_db_data in stored_vector_dbs: + vector_db = VectorDB.model_validate_json(vector_db_data) + index = VectorDBWithIndex( + vector_db, + QdrantIndex(self.client, vector_db.identifier), + self.inference_api, + ) + self.cache[vector_db.identifier] = index + + # Load OpenAI vector stores as before self.openai_vector_stores = await self._load_openai_vector_stores() async def shutdown(self) -> None: @@ -172,154 +197,73 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP # OpenAI Vector Store Mixin abstract method implementations async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Save vector store metadata to Qdrant collection metadata.""" - # Store metadata in a special collection for vector store metadata - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - # Create metadata collection if it doesn't exist - if not await self.client.collection_exists(metadata_collection): - # Use default distance metric for metadata collection - distance = models.Distance.COSINE - - await self.client.create_collection( - collection_name=metadata_collection, - vectors_config=models.VectorParams(size=1, distance=distance), - ) - - # Store metadata as a point with dummy vector - await self.client.upsert( - collection_name=metadata_collection, - points=[ - models.PointStruct( - id=convert_id(store_id), - vector=[0.0], # Dummy vector - payload={"metadata": store_info}, - ) - ], - ) + """Save vector store metadata to kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: - """Load all vector store metadata from Qdrant.""" - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - if not await self.client.collection_exists(metadata_collection): - return {} - - # Get all points from metadata collection - points = await self.client.scroll( - collection_name=metadata_collection, - limit=1000, # Reasonable limit for metadata - with_payload=True, - ) + """Load all vector store metadata from kvstore.""" + assert self.kvstore is not None + start_key = OPENAI_VECTOR_STORES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" + stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key) stores = {} - for point in points[0]: # points[0] contains the actual points - if point.payload and "metadata" in point.payload: - store_info = point.payload["metadata"] - stores[store_info["id"]] = store_info - + for store_data in stored_openai_stores: + store_info = json.loads(store_data) + stores[store_info["id"]] = store_info return stores async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Update vector store metadata in Qdrant.""" - await self._save_openai_vector_store(store_id, store_info) + """Update vector store metadata in kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: - """Delete vector store metadata from Qdrant.""" - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - if await self.client.collection_exists(metadata_collection): - await self.client.delete( - collection_name=metadata_collection, points_selector=models.PointIdsList(points=[convert_id(store_id)]) - ) + """Delete vector store metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.delete(key) async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: - """Save vector store file metadata to Qdrant collection metadata.""" - # Store file metadata in a special collection for vector store file metadata - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - # Create file metadata collection if it doesn't exist - if not await self.client.collection_exists(file_metadata_collection): - distance = models.Distance.COSINE - await self.client.create_collection( - collection_name=file_metadata_collection, - vectors_config=models.VectorParams(size=1, distance=distance), - ) - - # Store file metadata as a point with dummy vector - file_key = f"{store_id}:{file_id}" - await self.client.upsert( - collection_name=file_metadata_collection, - points=[ - models.PointStruct( - id=convert_id(file_key), - vector=[0.0], # Dummy vector - payload={"file_info": file_info, "file_contents": file_contents}, - ) - ], - ) + """Save vector store file metadata to kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=key, value=json.dumps(file_info)) + content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=content_key, value=json.dumps(file_contents)) async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return {} - - file_key = f"{store_id}:{file_id}" - points = await self.client.retrieve( - collection_name=file_metadata_collection, - ids=[convert_id(file_key)], - with_payload=True, - ) - - if points and points[0].payload and "file_info" in points[0].payload: - return points[0].payload["file_info"] - return {} + """Load vector store file metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + stored_data = await self.kvstore.get(key) + return json.loads(stored_data) if stored_data else {} async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return [] - - file_key = f"{store_id}:{file_id}" - points = await self.client.retrieve( - collection_name=file_metadata_collection, - ids=[convert_id(file_key)], - with_payload=True, - ) - - if points and points[0].payload and "file_contents" in points[0].payload: - return points[0].payload["file_contents"] - return [] + """Load vector store file contents from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + stored_data = await self.kvstore.get(key) + return json.loads(stored_data) if stored_data else [] async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return - - # Get existing file contents - existing_contents = await self._load_openai_vector_store_file_contents(store_id, file_id) - - # Update with new file info but keep existing contents - await self._save_openai_vector_store_file(store_id, file_id, file_info, existing_contents) + """Update vector store file metadata in kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=key, value=json.dumps(file_info)) async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store file metadata from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if await self.client.collection_exists(file_metadata_collection): - file_key = f"{store_id}:{file_id}" - await self.client.delete( - collection_name=file_metadata_collection, - points_selector=models.PointIdsList(points=[convert_id(file_key)]), - ) + """Delete vector store file metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.delete(key) + content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + await self.kvstore.delete(content_key) async def register_vector_db( self, diff --git a/tests/unit/providers/vector_io/test_qdrant.py b/tests/unit/providers/vector_io/test_qdrant.py index 87cd18ce3..d3db313c0 100644 --- a/tests/unit/providers/vector_io/test_qdrant.py +++ b/tests/unit/providers/vector_io/test_qdrant.py @@ -24,6 +24,7 @@ from llama_stack.providers.inline.vector_io.qdrant.config import ( from llama_stack.providers.remote.vector_io.qdrant.qdrant import ( QdrantVectorIOAdapter, ) +from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig # This test is a unit test for the QdrantVectorIOAdapter class. This should only contain # tests which are specific to this class. More general (API-level) tests should be placed in @@ -37,7 +38,9 @@ from llama_stack.providers.remote.vector_io.qdrant.qdrant import ( @pytest.fixture def qdrant_config(tmp_path) -> InlineQdrantVectorIOConfig: - return InlineQdrantVectorIOConfig(path=os.path.join(tmp_path, "qdrant.db")) + kvstore_config = SqliteKVStoreConfig(db_name=os.path.join(tmp_path, "test_kvstore.db")) + + return InlineQdrantVectorIOConfig(path=os.path.join(tmp_path, "qdrant.db"), kvstore=kvstore_config) @pytest.fixture(scope="session")