Feat: Adding support for milvus files API

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Javier Arceo 2025-06-26 22:07:09 -04:00
parent d165000bbc
commit ffbadc4238
2 changed files with 166 additions and 7 deletions

View file

@ -8,11 +8,11 @@ from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
from llama_stack.schema_utils import json_schema_type
from llama_stack.providers.utils.kvstore.config import ( from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig, KVStoreConfig,
SqliteKVStoreConfig, SqliteKVStoreConfig,
) )
from llama_stack.schema_utils import json_schema_type
@json_schema_type @json_schema_type

View file

@ -196,7 +196,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
index = VectorDBWithIndex( index = VectorDBWithIndex(
vector_db=vector_db, vector_db=vector_db,
index=MilvusIndex(client=self.client, collection_name=vector_db.identifier), index=MilvusIndex(client=self.client, collection_name=vector_db.identifier, kvstore=self.kvstore),
inference_api=self.inference_api, inference_api=self.inference_api,
) )
self.cache[vector_db_id] = index self.cache[vector_db_id] = index
@ -273,20 +273,179 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None: ) -> None:
"""Save vector store file metadata to Milvus database.""" """Save vector store file metadata to Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus") try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
file_schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
description="Metadata for OpenAI vector store files",
)
file_schema.add_field(
field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512
)
file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535)
await asyncio.to_thread(
self.client.create_collection,
collection_name="openai_vector_store_files",
schema=file_schema,
)
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
content_schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
description="Contents for OpenAI vector store files",
)
content_schema.add_field(
field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024
)
content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024)
content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
await asyncio.to_thread(
self.client.create_collection,
collection_name="openai_vector_store_files_contents",
schema=content_schema,
)
# Save file metadata
file_data = [
{
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"file_info": json.dumps(file_info),
}
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files",
data=file_data,
)
# Save file contents
contents_data = [
{
"chunk_id": generate_chunk_id(file_id, content.get("chunk_id", None)),
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"content": json.dumps(content),
}
for content in file_contents
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files_contents",
data=contents_data,
)
except Exception as e:
logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}")
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
"""Load vector store file metadata from Milvus database.""" """Load vector store file metadata from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus") try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return {}
query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'"
results = await asyncio.to_thread(
self.client.query,
collection_name="openai_vector_store_files",
filter=query_filter,
output_fields=["file_info"],
)
if results:
try:
return json.loads(results[0]["file_info"])
except json.JSONDecodeError as e:
logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}")
return {}
return {}
except Exception as e:
logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}")
return {}
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
"""Load vector store file contents from Milvus database.""" """Load vector store file contents from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus") try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
return []
query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'"
results = await asyncio.to_thread(
self.client.query,
collection_name="openai_vector_store_files_contents",
filter=query_filter,
output_fields=["chunk_id", "store_id", "file_id", "content"],
)
print(f"\nresults from milvus = {results}\n")
contents = []
for result in results:
try:
content = json.loads(result["content"])
contents.append(content)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}")
return contents
except Exception as e:
print(f"failed {e}")
logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}")
return []
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
"""Update vector store file metadata in Milvus database.""" """Update vector store file metadata in Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus") try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return
file_data = [
{
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"file_info": json.dumps(file_info),
}
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files",
data=file_data,
)
except Exception as e:
logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}")
raise
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
"""Delete vector store file metadata from Milvus database.""" """Delete vector store file metadata from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus") print("milvus is trying to delete stuff")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return
query_filter = f"store_id == '{store_id}' AND file_id == '{file_id}'"
await asyncio.to_thread(
self.client.delete,
collection_name="openai_vector_store_files",
filter=query_filter,
)
if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
await asyncio.to_thread(
self.client.delete,
collection_name="openai_vector_store_files_contents",
filter=query_filter,
)
except Exception as e:
logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}")
raise