chore: Add support for vector-stores files api for Milvus (#2582)
Some checks failed
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 4s
Integration Tests / test-matrix (library, 3.12, post_training) (push) Failing after 14s
Integration Tests / test-matrix (library, 3.13, datasets) (push) Failing after 11s
Integration Tests / test-matrix (library, 3.12, scoring) (push) Failing after 15s
Integration Tests / test-matrix (library, 3.12, datasets) (push) Failing after 17s
Integration Tests / test-matrix (library, 3.13, providers) (push) Failing after 9s
Integration Tests / test-matrix (library, 3.13, post_training) (push) Failing after 12s
Integration Tests / test-matrix (library, 3.13, inspect) (push) Failing after 14s
Integration Tests / test-matrix (library, 3.13, agents) (push) Failing after 16s
Integration Tests / test-matrix (library, 3.12, vector_io) (push) Failing after 16s
Integration Tests / test-matrix (library, 3.12, inference) (push) Failing after 22s
Integration Tests / test-matrix (library, 3.12, tool_runtime) (push) Failing after 20s
Integration Tests / test-matrix (library, 3.12, agents) (push) Failing after 21s
Integration Tests / test-matrix (library, 3.13, inference) (push) Failing after 18s
Integration Tests / test-matrix (library, 3.12, providers) (push) Failing after 18s
Integration Tests / test-matrix (library, 3.13, scoring) (push) Failing after 11s
Integration Tests / test-matrix (server, 3.12, agents) (push) Failing after 5s
Integration Tests / test-matrix (library, 3.13, vector_io) (push) Failing after 9s
Integration Tests / test-matrix (server, 3.12, datasets) (push) Failing after 6s
Integration Tests / test-matrix (library, 3.12, inspect) (push) Failing after 18s
Integration Tests / test-matrix (library, 3.13, tool_runtime) (push) Failing after 11s
Integration Tests / test-matrix (server, 3.12, post_training) (push) Failing after 8s
Integration Tests / test-matrix (server, 3.12, scoring) (push) Failing after 9s
Integration Tests / test-matrix (server, 3.12, inspect) (push) Failing after 10s
Integration Tests / test-matrix (server, 3.12, inference) (push) Failing after 8s
Integration Tests / test-matrix (server, 3.12, providers) (push) Failing after 10s
Integration Tests / test-matrix (server, 3.12, vector_io) (push) Failing after 9s
Integration Tests / test-matrix (server, 3.12, tool_runtime) (push) Failing after 7s
Integration Tests / test-matrix (server, 3.13, agents) (push) Failing after 10s
Integration Tests / test-matrix (server, 3.13, inference) (push) Failing after 10s
Integration Tests / test-matrix (server, 3.13, post_training) (push) Failing after 9s
Integration Tests / test-matrix (server, 3.13, datasets) (push) Failing after 12s
Integration Tests / test-matrix (server, 3.13, scoring) (push) Failing after 7s
Integration Tests / test-matrix (server, 3.13, inspect) (push) Failing after 13s
Integration Tests / test-matrix (server, 3.13, providers) (push) Failing after 13s
Vector IO Integration Tests / test-matrix (3.12, inline::milvus) (push) Failing after 7s
Integration Tests / test-matrix (server, 3.13, vector_io) (push) Failing after 9s
Vector IO Integration Tests / test-matrix (3.12, remote::chromadb) (push) Failing after 6s
Vector IO Integration Tests / test-matrix (3.12, inline::faiss) (push) Failing after 10s
Integration Tests / test-matrix (server, 3.13, tool_runtime) (push) Failing after 14s
Vector IO Integration Tests / test-matrix (3.12, inline::sqlite-vec) (push) Failing after 8s
Vector IO Integration Tests / test-matrix (3.13, inline::faiss) (push) Failing after 5s
Vector IO Integration Tests / test-matrix (3.12, remote::pgvector) (push) Failing after 8s
Vector IO Integration Tests / test-matrix (3.13, inline::milvus) (push) Failing after 6s
Vector IO Integration Tests / test-matrix (3.13, remote::pgvector) (push) Failing after 22s
Vector IO Integration Tests / test-matrix (3.13, inline::sqlite-vec) (push) Failing after 24s
Test Llama Stack Build / build-custom-container-distribution (push) Failing after 18s
Test Llama Stack Build / generate-matrix (push) Successful in 20s
Python Package Build Test / build (3.13) (push) Failing after 1s
Vector IO Integration Tests / test-matrix (3.13, remote::chromadb) (push) Failing after 28s
Unit Tests / unit-tests (3.12) (push) Failing after 3s
Test Llama Stack Build / build (push) Failing after 4s
Test External Providers / test-external-providers (venv) (push) Failing after 6s
Update ReadTheDocs / update-readthedocs (push) Failing after 5s
Unit Tests / unit-tests (3.13) (push) Failing after 9s
Python Package Build Test / build (3.12) (push) Failing after 51s
Test Llama Stack Build / build-single-provider (push) Failing after 55s
Test Llama Stack Build / build-ubi9-container-distribution (push) Failing after 54s
Pre-commit / pre-commit (push) Successful in 1m44s

# What does this PR do?
### Summary

This pull request implements support for the OpenAI Vector Store Files
API for the Milvus vector store provider in `llama_stack`. It enables
storing, loading, updating, and deleting file metadata and file contents
in Milvus collections, allowing OpenAI vector store files to be managed
directly within Milvus.

### Main Changes

- **Milvus Vector Store Files API Implementation**
- Implements all required methods for storing, loading, updating, and
deleting vector store file metadata and contents
(`_save_openai_vector_store_file`, `_load_openai_vector_store_file`,
`_load_openai_vector_store_file_contents`,
`_update_openai_vector_store_file`,
`_delete_openai_vector_store_file_from_storage`).
- Uses two Milvus collections: `openai_vector_store_files` (for
metadata) and `openai_vector_store_files_contents` (for chunked file
contents).
- Collections are created dynamically if they do not exist, with
appropriate schema definitions.
- **Collection Name Sanitization**
- Adds a `sanitize_collection_name` utility to ensure Milvus collection
names only contain valid characters (letters, numbers, underscores).
- **Testing**
- Updates test skip logic to include `"inline::milvus"` for cases where
the OpenAI Vector Store Files API is not supported, improving
integration test accuracy.
- **Other Improvements**
  - Passes `kvstore` to `MilvusIndex` for consistency.
- Removes obsolete NotImplementedErrors and legacy code for file
storage.

## Test Plan
CI and tested via a test script

## Notes
- `VectorDB` currently uses the `name` as the `identifier` in
`openai_create_vector_store`. We need to add `name` as a field to
`VectorDB` and generate the `identifier` upon creation. OpenAI is not
idempotent with respect to the `name` field that they pass (i.e., you
can pass the same name multiple times and OpenAI will generate a new
identifier). I'll add a follow up PR for this.
- The `Files` api needs to use `files-` as a prefix in the identifier. I
have updated the Vector Store to use the OpenAI prefix `vs_*`.

---------

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Arceo 2025-07-03 15:15:33 -04:00 committed by GitHub
parent dae1fcd3c2
commit 4afd619c56
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 179 additions and 20 deletions

View file

@ -8,10 +8,11 @@ import asyncio
import json
import logging
import os
import re
from typing import Any
from numpy.typing import NDArray
from pymilvus import MilvusClient
from pymilvus import DataType, MilvusClient
from llama_stack.apis.files.files import Files
from llama_stack.apis.inference import Inference, InterleavedContent
@ -43,12 +44,20 @@ OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:milvus:{VERSION}::"
def sanitize_collection_name(name: str) -> str:
"""
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
Any other characters are replaced with underscores.
"""
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
class MilvusIndex(EmbeddingIndex):
def __init__(
self, client: MilvusClient, collection_name: str, consistency_level="Strong", kvstore: KVStore | None = None
):
self.client = client
self.collection_name = collection_name.replace("-", "_")
self.collection_name = sanitize_collection_name(collection_name)
self.consistency_level = consistency_level
self.kvstore = kvstore
@ -196,7 +205,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
index = VectorDBWithIndex(
vector_db=vector_db,
index=MilvusIndex(client=self.client, collection_name=vector_db.identifier),
index=MilvusIndex(client=self.client, collection_name=vector_db.identifier, kvstore=self.kvstore),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
@ -251,16 +260,6 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
await self.kvstore.delete(key)
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
"""Save vector store file metadata to Milvus database."""
assert self.kvstore is not None
key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}"
await self.kvstore.set(key=key, value=json.dumps(file_info))
content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}"
await self.kvstore.set(key=content_key, value=json.dumps(file_contents))
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
"""Load all vector store metadata from persistent storage."""
assert self.kvstore is not None
@ -273,20 +272,181 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
"""Save vector store file metadata to Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus")
if store_id not in self.openai_vector_stores:
store_info = await self._load_openai_vector_stores(store_id)
if not store_info:
logger.error(f"OpenAI vector store {store_id} not found")
raise ValueError(f"No vector store found with id {store_id}")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
file_schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
description="Metadata for OpenAI vector store files",
)
file_schema.add_field(
field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512
)
file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535)
await asyncio.to_thread(
self.client.create_collection,
collection_name="openai_vector_store_files",
schema=file_schema,
)
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
content_schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
description="Contents for OpenAI vector store files",
)
content_schema.add_field(
field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024
)
content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024)
content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
await asyncio.to_thread(
self.client.create_collection,
collection_name="openai_vector_store_files_contents",
schema=content_schema,
)
file_data = [
{
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"file_info": json.dumps(file_info),
}
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files",
data=file_data,
)
# Save file contents
contents_data = [
{
"chunk_id": content.get("chunk_metadata").get("chunk_id"),
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"content": json.dumps(content),
}
for content in file_contents
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files_contents",
data=contents_data,
)
except Exception as e:
logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}")
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
"""Load vector store file metadata from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return {}
query_filter = f"store_file_id == '{store_id}_{file_id}'"
results = await asyncio.to_thread(
self.client.query,
collection_name="openai_vector_store_files",
filter=query_filter,
output_fields=["file_info"],
)
if results:
try:
return json.loads(results[0]["file_info"])
except json.JSONDecodeError as e:
logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}")
return {}
return {}
except Exception as e:
logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}")
return {}
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
"""Load vector store file contents from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
return []
query_filter = (
f"store_id == '{store_id}' AND file_id == '{file_id}' AND store_file_id == '{store_id}_{file_id}'"
)
results = await asyncio.to_thread(
self.client.query,
collection_name="openai_vector_store_files_contents",
filter=query_filter,
output_fields=["chunk_id", "store_id", "file_id", "content"],
)
contents = []
for result in results:
try:
content = json.loads(result["content"])
contents.append(content)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}")
return contents
except Exception as e:
logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}")
return []
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
"""Update vector store file metadata in Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return
file_data = [
{
"store_file_id": f"{store_id}_{file_id}",
"store_id": store_id,
"file_id": file_id,
"file_info": json.dumps(file_info),
}
]
await asyncio.to_thread(
self.client.upsert,
collection_name="openai_vector_store_files",
data=file_data,
)
except Exception as e:
logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}")
raise
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
"""Delete vector store file metadata from Milvus database."""
raise NotImplementedError("Files API not yet implemented for Milvus")
try:
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
return
query_filter = f"store_file_id in ['{store_id}_{file_id}']"
await asyncio.to_thread(
self.client.delete,
collection_name="openai_vector_store_files",
filter=query_filter,
)
if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
await asyncio.to_thread(
self.client.delete,
collection_name="openai_vector_store_files_contents",
filter=query_filter,
)
except Exception as e:
logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}")
raise