diff --git a/llama_stack/providers/inline/vector_io/milvus/config.py b/llama_stack/providers/inline/vector_io/milvus/config.py index 61a8acd78..601fe80b8 100644 --- a/llama_stack/providers/inline/vector_io/milvus/config.py +++ b/llama_stack/providers/inline/vector_io/milvus/config.py @@ -8,11 +8,11 @@ from typing import Any from pydantic import BaseModel +from llama_stack.schema_utils import json_schema_type from llama_stack.providers.utils.kvstore.config import ( KVStoreConfig, SqliteKVStoreConfig, ) -from llama_stack.schema_utils import json_schema_type @json_schema_type diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index 5e0a449b8..a4ed21e64 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -196,7 +196,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP index = VectorDBWithIndex( vector_db=vector_db, - index=MilvusIndex(client=self.client, collection_name=vector_db.identifier), + index=MilvusIndex(client=self.client, collection_name=vector_db.identifier, kvstore=self.kvstore), inference_api=self.inference_api, ) self.cache[vector_db_id] = index @@ -273,20 +273,179 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: """Save vector store file metadata to Milvus database.""" - raise NotImplementedError("Files API not yet implemented for Milvus") + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + file_schema = MilvusClient.create_schema( + auto_id=False, + enable_dynamic_field=True, + description="Metadata for OpenAI vector store files", + ) + file_schema.add_field( + field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512 + ) + file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) + file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) + file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535) + + await asyncio.to_thread( + self.client.create_collection, + collection_name="openai_vector_store_files", + schema=file_schema, + ) + + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + content_schema = MilvusClient.create_schema( + auto_id=False, + enable_dynamic_field=True, + description="Contents for OpenAI vector store files", + ) + content_schema.add_field( + field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024 + ) + content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024) + content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) + content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) + content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535) + + await asyncio.to_thread( + self.client.create_collection, + collection_name="openai_vector_store_files_contents", + schema=content_schema, + ) + + # Save file metadata + file_data = [ + { + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "file_info": json.dumps(file_info), + } + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files", + data=file_data, + ) + + # Save file contents + contents_data = [ + { + "chunk_id": generate_chunk_id(file_id, content.get("chunk_id", None)), + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "content": json.dumps(content), + } + for content in file_contents + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files_contents", + data=contents_data, + ) + + except Exception as e: + logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: """Load vector store file metadata from Milvus database.""" - raise NotImplementedError("Files API not yet implemented for Milvus") + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return {} + + query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'" + results = await asyncio.to_thread( + self.client.query, + collection_name="openai_vector_store_files", + filter=query_filter, + output_fields=["file_info"], + ) + + if results: + try: + return json.loads(results[0]["file_info"]) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}") + return {} + return {} + except Exception as e: + logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}") + return {} async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: """Load vector store file contents from Milvus database.""" - raise NotImplementedError("Files API not yet implemented for Milvus") + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + return [] + + query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'" + results = await asyncio.to_thread( + self.client.query, + collection_name="openai_vector_store_files_contents", + filter=query_filter, + output_fields=["chunk_id", "store_id", "file_id", "content"], + ) + print(f"\nresults from milvus = {results}\n") + + contents = [] + for result in results: + try: + content = json.loads(result["content"]) + contents.append(content) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}") + return contents + except Exception as e: + print(f"failed {e}") + + logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}") + return [] async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: """Update vector store file metadata in Milvus database.""" - raise NotImplementedError("Files API not yet implemented for Milvus") + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return + + file_data = [ + { + "store_file_id": f"{store_id}_{file_id}", + "store_id": store_id, + "file_id": file_id, + "file_info": json.dumps(file_info), + } + ] + await asyncio.to_thread( + self.client.upsert, + collection_name="openai_vector_store_files", + data=file_data, + ) + except Exception as e: + logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") + raise async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: """Delete vector store file metadata from Milvus database.""" - raise NotImplementedError("Files API not yet implemented for Milvus") + print("milvus is trying to delete stuff") + try: + if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): + return + + query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'" + await asyncio.to_thread( + self.client.delete, + collection_name="openai_vector_store_files", + filter=query_filter, + ) + + if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): + await asyncio.to_thread( + self.client.delete, + collection_name="openai_vector_store_files_contents", + filter=query_filter, + ) + + except Exception as e: + logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}") + raise