diff --git a/docs/source/providers/vector_io/remote_milvus.md b/docs/source/providers/vector_io/remote_milvus.md index 6734d8315..53b2cc816 100644 --- a/docs/source/providers/vector_io/remote_milvus.md +++ b/docs/source/providers/vector_io/remote_milvus.md @@ -114,7 +114,7 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi | `uri` | `` | No | PydanticUndefined | The URI of the Milvus server | | `token` | `str \| None` | No | PydanticUndefined | The token of the Milvus server | | `consistency_level` | `` | No | Strong | The consistency level of the Milvus server | -| `kvstore` | `utils.kvstore.config.RedisKVStoreConfig \| utils.kvstore.config.SqliteKVStoreConfig \| utils.kvstore.config.PostgresKVStoreConfig \| utils.kvstore.config.MongoDBKVStoreConfig` | No | sqlite | Config for KV store backend | +| `kvstore` | `utils.kvstore.config.RedisKVStoreConfig \| utils.kvstore.config.SqliteKVStoreConfig \| utils.kvstore.config.PostgresKVStoreConfig \| utils.kvstore.config.MongoDBKVStoreConfig, annotation=NoneType, required=False, default='sqlite', discriminator='type'` | No | | Config for KV store backend (SQLite only for now). Optional for remote Milvus connections - only needed for vector database registry persistence across server restarts. | | `config` | `dict` | No | {} | This configuration allows additional fields to be passed through to the underlying Milvus client. See the [Milvus](https://milvus.io/docs/install-overview.md) documentation for more details about Milvus in general. | > **Note**: This configuration class accepts additional fields beyond those listed above. You can pass any additional configuration options that will be forwarded to the underlying provider. diff --git a/llama_stack/providers/remote/vector_io/milvus/config.py b/llama_stack/providers/remote/vector_io/milvus/config.py index 899d3678d..8e5bd4d98 100644 --- a/llama_stack/providers/remote/vector_io/milvus/config.py +++ b/llama_stack/providers/remote/vector_io/milvus/config.py @@ -17,7 +17,10 @@ class MilvusVectorIOConfig(BaseModel): uri: str = Field(description="The URI of the Milvus server") token: str | None = Field(description="The token of the Milvus server") consistency_level: str = Field(description="The consistency level of the Milvus server", default="Strong") - kvstore: KVStoreConfig = Field(description="Config for KV store backend") + kvstore: KVStoreConfig | None = Field( + description="Config for KV store backend (SQLite only for now). Optional for remote Milvus connections - only needed for vector database registry persistence across server restarts.", + default=None, + ) # This configuration allows additional fields to be passed through to the underlying Milvus client. # See the [Milvus](https://milvus.io/docs/install-overview.md) documentation for more details about Milvus in general. diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index f1652a80e..087427c4c 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -276,10 +276,32 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.metadata_collection_name = "openai_vector_stores_metadata" async def initialize(self) -> None: - self.kvstore = await kvstore_impl(self.config.kvstore) - start_key = VECTOR_DBS_PREFIX - end_key = f"{VECTOR_DBS_PREFIX}\xff" - stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) + # MilvusVectorIOAdapter is used for both inline and remote connections + if isinstance(self.config, RemoteMilvusVectorIOConfig): + # Remote Milvus: kvstore is optional for registry persistence across server restarts + if self.config.kvstore is not None: + self.kvstore = await kvstore_impl(self.config.kvstore) + logger.info("Remote Milvus: Using kvstore for vector database registry persistence") + else: + self.kvstore = None + logger.info("Remote Milvus: No kvstore configured, registry will not persist across restarts") + if self.kvstore is not None: + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) + else: + stored_vector_dbs = [] + + elif isinstance(self.config, InlineMilvusVectorIOConfig): + self.kvstore = await kvstore_impl(self.config.kvstore) + logger.info("Inline Milvus: Using kvstore for local vector database registry") + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key) + else: + raise ValueError( + f"Unsupported config type: {type(self.config)}. Expected RemoteMilvusVectorIOConfig or InlineMilvusVectorIOConfig" + ) for vector_db_data in stored_vector_dbs: vector_db = VectorDB.model_validate_json(vector_db_data) @@ -295,12 +317,16 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP ) self.cache[vector_db.identifier] = index if isinstance(self.config, RemoteMilvusVectorIOConfig): - logger.info(f"Connecting to Milvus server at {self.config.uri}") + logger.info(f"Connecting to remote Milvus server at {self.config.uri}") self.client = MilvusClient(**self.config.model_dump(exclude_none=True)) - else: - logger.info(f"Connecting to Milvus Lite at: {self.config.db_path}") + elif isinstance(self.config, InlineMilvusVectorIOConfig): + logger.info(f"Connecting to local Milvus Lite at: {self.config.db_path}") uri = os.path.expanduser(self.config.db_path) self.client = MilvusClient(uri=uri) + else: + raise ValueError( + f"Unsupported config type: {type(self.config)}. Expected RemoteMilvusVectorIOConfig or InlineMilvusVectorIOConfig" + ) # Load existing OpenAI vector stores into the in-memory cache await self.initialize_openai_vector_stores() @@ -314,8 +340,12 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP ) -> None: if isinstance(self.config, RemoteMilvusVectorIOConfig): consistency_level = self.config.consistency_level + elif isinstance(self.config, InlineMilvusVectorIOConfig): + consistency_level = self.config.consistency_level else: - consistency_level = "Strong" + raise ValueError( + f"Unsupported config type: {type(self.config)}. Expected RemoteMilvusVectorIOConfig or InlineMilvusVectorIOConfig" + ) index = VectorDBWithIndex( vector_db=vector_db, index=MilvusIndex(self.client, vector_db.identifier, consistency_level=consistency_level), @@ -389,3 +419,217 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP for chunk_id in chunk_ids: # Use the index's delete_chunk method await index.index.delete_chunk(chunk_id) + + async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: + """Save vector store metadata to persistent storage.""" + if self.kvstore is not None: + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) + self.openai_vector_stores[store_id] = store_info + + async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: + """Update vector store metadata in persistent storage.""" + if self.kvstore is not None: + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.set(key=key, value=json.dumps(store_info)) + self.openai_vector_stores[store_id] = store_info + + async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: + """Delete vector store metadata from persistent storage.""" + if self.kvstore is not None: + key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" + await self.kvstore.delete(key) + if store_id in self.openai_vector_stores: + del self.openai_vector_stores[store_id] + + async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: + """Load all vector store metadata from persistent storage.""" + if self.kvstore is None: + return {} + start_key = OPENAI_VECTOR_STORES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" + stored = await self.kvstore.values_in_range(start_key, end_key) + return {json.loads(s)["id"]: json.loads(s) for s in stored} + + async def _save_openai_vector_store_file( + self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] + ) -> None: + """Save vector store file metadata to Milvus database.""" + if store_id not in self.openai_vector_stores: + store_info = await self._load_openai_vector_stores(store_id) + if not store_info: + logger.error(f"OpenAI vector store {store_id} not found") + raise ValueError(f"No vector store found with id {store_id}") + + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + file_schema = MilvusClient.create_schema( + auto_id=False, + enable_dynamic_field=True, + description="Metadata for OpenAI vector store files", + ) + file_schema.add_field( + field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512 + ) + file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) + file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) + file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535) + + await asyncio.to_thread( + self.client.create_collection, + collection_name="openai_vector_store_files", + schema=file_schema, + ) + + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + content_schema = MilvusClient.create_schema( + auto_id=False, + enable_dynamic_field=True, + description="Contents for OpenAI vector store files", + ) + content_schema.add_field( + field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024 + ) + content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024) + content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) + content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) + content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535) + + await asyncio.to_thread( + self.client.create_collection, + collection_name="openai_vector_store_files_contents", + schema=content_schema, + ) + + file_data = [ + { + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "file_info": json.dumps(file_info), + } + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files", + data=file_data, + ) + + # Save file contents + contents_data = [ + { + "chunk_id": content.get("chunk_metadata").get("chunk_id"), + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "content": json.dumps(content), + } + for content in file_contents + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files_contents", + data=contents_data, + ) + + except Exception as e: + logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") + + async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: + """Load vector store file metadata from Milvus database.""" + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return {} + + query_filter = f"store_file_id == '{store_id}_{file_id}'" + results = await asyncio.to_thread( + self.client.query, + collection_name="openai_vector_store_files", + filter=query_filter, + output_fields=["file_info"], + ) + + if results: + try: + return json.loads(results[0]["file_info"]) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}") + return {} + return {} + except Exception as e: + logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}") + return {} + + async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: + """Update vector store file metadata in Milvus database.""" + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return + + file_data = [ + { + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "file_info": json.dumps(file_info), + } + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files", + data=file_data, + ) + except Exception as e: + logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") + raise + + async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: + """Load vector store file contents from Milvus database.""" + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + return [] + + query_filter = ( + f"store_id == '{store_id}' AND file_id == '{file_id}' AND store_file_id == '{store_id}_{file_id}'" + ) + results = await asyncio.to_thread( + self.client.query, + collection_name="openai_vector_store_files_contents", + filter=query_filter, + output_fields=["chunk_id", "store_id", "file_id", "content"], + ) + + contents = [] + for result in results: + try: + content = json.loads(result["content"]) + contents.append(content) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}") + return contents + except Exception as e: + logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}") + return [] + + async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: + """Delete vector store file metadata from Milvus database.""" + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return + + query_filter = f"store_file_id in ['{store_id}_{file_id}']" + await asyncio.to_thread( + self.client.delete, + collection_name="openai_vector_store_files", + filter=query_filter, + ) + if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + await asyncio.to_thread( + self.client.delete, + collection_name="openai_vector_store_files_contents", + filter=query_filter, + ) + + except Exception as e: + logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}") + raise