diff --git a/llama_stack/providers/inline/vector_io/qdrant/config.py b/llama_stack/providers/inline/vector_io/qdrant/config.py index 7cc91d918..7b46cc3d9 100644 --- a/llama_stack/providers/inline/vector_io/qdrant/config.py +++ b/llama_stack/providers/inline/vector_io/qdrant/config.py @@ -9,15 +9,24 @@ from typing import Any from pydantic import BaseModel +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) from llama_stack.schema_utils import json_schema_type @json_schema_type class QdrantVectorIOConfig(BaseModel): path: str + kvstore: KVStoreConfig @classmethod def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]: return { "path": "${env.QDRANT_PATH:=~/.llama/" + __distro_dir__ + "}/" + "qdrant.db", + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="qdrant_store.db", + ), } diff --git a/llama_stack/providers/remote/vector_io/qdrant/config.py b/llama_stack/providers/remote/vector_io/qdrant/config.py index 314d3f5f1..efb30b036 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/config.py +++ b/llama_stack/providers/remote/vector_io/qdrant/config.py @@ -8,6 +8,10 @@ from typing import Any from pydantic import BaseModel +from llama_stack.providers.utils.kvstore.config import ( + KVStoreConfig, + SqliteKVStoreConfig, +) from llama_stack.schema_utils import json_schema_type @@ -23,9 +27,14 @@ class QdrantVectorIOConfig(BaseModel): prefix: str | None = None timeout: int | None = None host: str | None = None + kvstore: KVStoreConfig @classmethod - def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]: + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { "api_key": "${env.QDRANT_API_KEY}", + "kvstore": SqliteKVStoreConfig.sample_run_config( + __distro_dir__=__distro_dir__, + db_name="qdrant_store.db", + ), } diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index e00087d48..1c25de6d3 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -4,6 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import json import logging import uuid from typing import Any @@ -22,6 +23,7 @@ from llama_stack.apis.vector_io import ( ) from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig +from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( EmbeddingIndex, @@ -32,7 +34,13 @@ from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig log = logging.getLogger(__name__) CHUNK_ID_KEY = "_chunk_id" -OPENAI_VECTOR_STORES_METADATA_COLLECTION = "openai_vector_stores_metadata" + +# KV store prefixes for OpenAI vector stores +OPENAI_VECTOR_STORES_PREFIX = "openai_vector_stores:" +OPENAI_VECTOR_STORES_FILES_PREFIX = "openai_vector_stores_files:" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = "openai_vector_stores_files_contents:" + +VECTOR_DBS_PREFIX = "vector_dbs:" def convert_id(_id: str) -> str: @@ -160,11 +168,28 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.inference_api = inference_api self.files_api = files_api self.vector_db_store = None + self.kvstore: KVStore | None = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} async def initialize(self) -> None: self.client = AsyncQdrantClient(**self.config.model_dump(exclude_none=True)) - # Load existing OpenAI vector stores using the mixin method + self.kvstore = await kvstore_impl(self.config.kvstore) + + # Load existing vector DBs from kvstore + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) + + for vector_db_data in stored_vector_dbs: + vector_db = VectorDB.model_validate_json(vector_db_data) + index = VectorDBWithIndex( + vector_db, + QdrantIndex(self.client, vector_db.identifier), + self.inference_api, + ) + self.cache[vector_db.identifier] = index + + # Load OpenAI vector stores as before self.openai_vector_stores = await self._load_openai_vector_stores() async def shutdown(self) -> None: @@ -172,154 +197,73 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP # OpenAI Vector Store Mixin abstract method implementations async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Save vector store metadata to Qdrant collection metadata.""" - # Store metadata in a special collection for vector store metadata - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - # Create metadata collection if it doesn't exist - if not await self.client.collection_exists(metadata_collection): - # Use default distance metric for metadata collection - distance = models.Distance.COSINE - - await self.client.create_collection( - collection_name=metadata_collection, - vectors_config=models.VectorParams(size=1, distance=distance), - ) - - # Store metadata as a point with dummy vector - await self.client.upsert( - collection_name=metadata_collection, - points=[ - models.PointStruct( - id=convert_id(store_id), - vector=[0.0], # Dummy vector - payload={"metadata": store_info}, - ) - ], - ) + """Save vector store metadata to kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: - """Load all vector store metadata from Qdrant.""" - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - if not await self.client.collection_exists(metadata_collection): - return {} - - # Get all points from metadata collection - points = await self.client.scroll( - collection_name=metadata_collection, - limit=1000, # Reasonable limit for metadata - with_payload=True, - ) + """Load all vector store metadata from kvstore.""" + assert self.kvstore is not None + start_key = OPENAI_VECTOR_STORES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" + stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key) stores = {} - for point in points[0]: # points[0] contains the actual points - if point.payload and "metadata" in point.payload: - store_info = point.payload["metadata"] - stores[store_info["id"]] = store_info - + for store_data in stored_openai_stores: + store_info = json.loads(store_data) + stores[store_info["id"]] = store_info return stores async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: - """Update vector store metadata in Qdrant.""" - await self._save_openai_vector_store(store_id, store_info) + """Update vector store metadata in kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: - """Delete vector store metadata from Qdrant.""" - metadata_collection = OPENAI_VECTOR_STORES_METADATA_COLLECTION - - if await self.client.collection_exists(metadata_collection): - await self.client.delete( - collection_name=metadata_collection, points_selector=models.PointIdsList(points=[convert_id(store_id)]) - ) + """Delete vector store metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.delete(key) async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: - """Save vector store file metadata to Qdrant collection metadata.""" - # Store file metadata in a special collection for vector store file metadata - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - # Create file metadata collection if it doesn't exist - if not await self.client.collection_exists(file_metadata_collection): - distance = models.Distance.COSINE - await self.client.create_collection( - collection_name=file_metadata_collection, - vectors_config=models.VectorParams(size=1, distance=distance), - ) - - # Store file metadata as a point with dummy vector - file_key = f"{store_id}:{file_id}" - await self.client.upsert( - collection_name=file_metadata_collection, - points=[ - models.PointStruct( - id=convert_id(file_key), - vector=[0.0], # Dummy vector - payload={"file_info": file_info, "file_contents": file_contents}, - ) - ], - ) + """Save vector store file metadata to kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=key, value=json.dumps(file_info)) + content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=content_key, value=json.dumps(file_contents)) async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return {} - - file_key = f"{store_id}:{file_id}" - points = await self.client.retrieve( - collection_name=file_metadata_collection, - ids=[convert_id(file_key)], - with_payload=True, - ) - - if points and points[0].payload and "file_info" in points[0].payload: - return points[0].payload["file_info"] - return {} + """Load vector store file metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + stored_data = await self.kvstore.get(key) + return json.loads(stored_data) if stored_data else {} async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return [] - - file_key = f"{store_id}:{file_id}" - points = await self.client.retrieve( - collection_name=file_metadata_collection, - ids=[convert_id(file_key)], - with_payload=True, - ) - - if points and points[0].payload and "file_contents" in points[0].payload: - return points[0].payload["file_contents"] - return [] + """Load vector store file contents from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + stored_data = await self.kvstore.get(key) + return json.loads(stored_data) if stored_data else [] async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if not await self.client.collection_exists(file_metadata_collection): - return - - # Get existing file contents - existing_contents = await self._load_openai_vector_store_file_contents(store_id, file_id) - - # Update with new file info but keep existing contents - await self._save_openai_vector_store_file(store_id, file_id, file_info, existing_contents) + """Update vector store file metadata in kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=key, value=json.dumps(file_info)) async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store file metadata from Qdrant.""" - file_metadata_collection = f"{OPENAI_VECTOR_STORES_METADATA_COLLECTION}_files" - - if await self.client.collection_exists(file_metadata_collection): - file_key = f"{store_id}:{file_id}" - await self.client.delete( - collection_name=file_metadata_collection, - points_selector=models.PointIdsList(points=[convert_id(file_key)]), - ) + """Delete vector store file metadata from kvstore.""" + assert self.kvstore is not None + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.delete(key) + content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" + await self.kvstore.delete(content_key) async def register_vector_db( self, diff --git a/tests/unit/providers/vector_io/test_qdrant.py b/tests/unit/providers/vector_io/test_qdrant.py index 87cd18ce3..d3db313c0 100644 --- a/tests/unit/providers/vector_io/test_qdrant.py +++ b/tests/unit/providers/vector_io/test_qdrant.py @@ -24,6 +24,7 @@ from llama_stack.providers.inline.vector_io.qdrant.config import ( from llama_stack.providers.remote.vector_io.qdrant.qdrant import ( QdrantVectorIOAdapter, ) +from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig # This test is a unit test for the QdrantVectorIOAdapter class. This should only contain # tests which are specific to this class. More general (API-level) tests should be placed in @@ -37,7 +38,9 @@ from llama_stack.providers.remote.vector_io.qdrant.qdrant import ( @pytest.fixture def qdrant_config(tmp_path) -> InlineQdrantVectorIOConfig: - return InlineQdrantVectorIOConfig(path=os.path.join(tmp_path, "qdrant.db")) + kvstore_config = SqliteKVStoreConfig(db_name=os.path.join(tmp_path, "test_kvstore.db")) + + return InlineQdrantVectorIOConfig(path=os.path.join(tmp_path, "qdrant.db"), kvstore=kvstore_config) @pytest.fixture(scope="session")