chore: Enabling Integration tests for Weaviate (#2882)

# What does this PR do?

This PR (1) enables the files API for Weaviate and (2) enables
integration tests for Weaviate, which adds a docker container to the
github action.

This PR also handles a couple of edge cases for in creating the
collection and ensuring the tests all pass.

## Test Plan
CI enabled

---------

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Arceo 2025-07-31 20:29:50 -04:00 committed by GitHub
parent 369286f95b
commit 33cca26154
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 2197 additions and 2033 deletions

View file

@ -16,7 +16,7 @@ from pydantic import BaseModel, Field
from llama_stack.apis.inference import InterleavedContent
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.providers.utils.telemetry.trace_protocol import trace_protocol
from llama_stack.providers.utils.vector_io.chunk_utils import generate_chunk_id
from llama_stack.providers.utils.vector_io.vector_utils import generate_chunk_id
from llama_stack.schema_utils import json_schema_type, webmethod
from llama_stack.strong_typing.schema import register_schema

View file

@ -7,7 +7,6 @@
import asyncio
import logging
import os
import re
from typing import Any
from numpy.typing import NDArray
@ -31,6 +30,7 @@ from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
from .config import MilvusVectorIOConfig as RemoteMilvusVectorIOConfig
@ -44,14 +44,6 @@ OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:milvus:{VERSION}::"
def sanitize_collection_name(name: str) -> str:
"""
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
Any other characters are replaced with underscores.
"""
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
class MilvusIndex(EmbeddingIndex):
def __init__(
self, client: MilvusClient, collection_name: str, consistency_level="Strong", kvstore: KVStore | None = None

View file

@ -12,6 +12,6 @@ from .config import WeaviateVectorIOConfig
async def get_adapter_impl(config: WeaviateVectorIOConfig, deps: dict[Api, ProviderSpec]):
from .weaviate import WeaviateVectorIOAdapter
impl = WeaviateVectorIOAdapter(config, deps[Api.inference])
impl = WeaviateVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files, None))
await impl.initialize()
return impl

View file

@ -12,18 +12,24 @@ from llama_stack.providers.utils.kvstore.config import (
KVStoreConfig,
SqliteKVStoreConfig,
)
from llama_stack.schema_utils import json_schema_type
class WeaviateRequestProviderData(BaseModel):
weaviate_api_key: str
weaviate_cluster_url: str
@json_schema_type
class WeaviateVectorIOConfig(BaseModel):
weaviate_api_key: str | None = Field(description="The API key for the Weaviate instance", default=None)
weaviate_cluster_url: str | None = Field(description="The URL of the Weaviate cluster", default="localhost:8080")
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
class WeaviateVectorIOConfig(BaseModel):
@classmethod
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
def sample_run_config(
cls,
__distro_dir__: str,
**kwargs: Any,
) -> dict[str, Any]:
return {
"weaviate_api_key": None,
"weaviate_cluster_url": "${env.WEAVIATE_CLUSTER_URL:=localhost:8080}",
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="weaviate_registry.db",

View file

@ -22,12 +22,16 @@ from llama_stack.core.request_headers import NeedsRequestProviderData
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import (
OpenAIVectorStoreMixin,
)
from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
from .config import WeaviateRequestProviderData, WeaviateVectorIOConfig
from .config import WeaviateVectorIOConfig
log = logging.getLogger(__name__)
@ -40,11 +44,19 @@ OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_conten
class WeaviateIndex(EmbeddingIndex):
def __init__(self, client: weaviate.Client, collection_name: str, kvstore: KVStore | None = None):
def __init__(
self,
client: weaviate.Client,
collection_name: str,
kvstore: KVStore | None = None,
):
self.client = client
self.collection_name = collection_name
self.collection_name = sanitize_collection_name(collection_name, weaviate_format=True)
self.kvstore = kvstore
async def initialize(self):
pass
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
@ -68,10 +80,13 @@ class WeaviateIndex(EmbeddingIndex):
collection.data.insert_many(data_objects)
async def delete_chunk(self, chunk_id: str) -> None:
raise NotImplementedError("delete_chunk is not supported in Chroma")
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
collection = self.client.collections.get(sanitized_collection_name)
collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id]))
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
collection = self.client.collections.get(self.collection_name)
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
collection = self.client.collections.get(sanitized_collection_name)
results = collection.query.near_vector(
near_vector=embedding.tolist(),
@ -95,8 +110,17 @@ class WeaviateIndex(EmbeddingIndex):
return QueryChunksResponse(chunks=chunks, scores=scores)
async def delete(self, chunk_ids: list[str]) -> None:
collection = self.client.collections.get(self.collection_name)
async def delete(self, chunk_ids: list[str] | None = None) -> None:
"""
Delete chunks by IDs if provided, otherwise drop the entire collection.
"""
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
if chunk_ids is None:
# Drop entire collection if it exists
if self.client.collections.exists(sanitized_collection_name):
self.client.collections.delete(sanitized_collection_name)
return
collection = self.client.collections.get(sanitized_collection_name)
collection.data.delete_many(where=Filter.by_property("id").contains_any(chunk_ids))
async def query_keyword(
@ -120,6 +144,7 @@ class WeaviateIndex(EmbeddingIndex):
class WeaviateVectorIOAdapter(
OpenAIVectorStoreMixin,
VectorIO,
NeedsRequestProviderData,
VectorDBsProtocolPrivate,
@ -141,42 +166,56 @@ class WeaviateVectorIOAdapter(
self.metadata_collection_name = "openai_vector_stores_metadata"
def _get_client(self) -> weaviate.Client:
provider_data = self.get_request_provider_data()
assert provider_data is not None, "Request provider data must be set"
assert isinstance(provider_data, WeaviateRequestProviderData)
key = f"{provider_data.weaviate_cluster_url}::{provider_data.weaviate_api_key}"
if key in self.client_cache:
return self.client_cache[key]
client = weaviate.connect_to_weaviate_cloud(
cluster_url=provider_data.weaviate_cluster_url,
auth_credentials=Auth.api_key(provider_data.weaviate_api_key),
)
if "localhost" in self.config.weaviate_cluster_url:
log.info("using Weaviate locally in container")
host, port = self.config.weaviate_cluster_url.split(":")
key = "local_test"
client = weaviate.connect_to_local(
host=host,
port=port,
)
else:
log.info("Using Weaviate remote cluster with URL")
key = f"{self.config.weaviate_cluster_url}::{self.config.weaviate_api_key}"
if key in self.client_cache:
return self.client_cache[key]
client = weaviate.connect_to_weaviate_cloud(
cluster_url=self.config.weaviate_cluster_url,
auth_credentials=Auth.api_key(self.config.weaviate_api_key),
)
self.client_cache[key] = client
return client
async def initialize(self) -> None:
"""Set up KV store and load existing vector DBs and OpenAI vector stores."""
# Initialize KV store for metadata
self.kvstore = await kvstore_impl(self.config.kvstore)
# Initialize KV store for metadata if configured
if self.config.kvstore is not None:
self.kvstore = await kvstore_impl(self.config.kvstore)
else:
self.kvstore = None
log.info("No kvstore configured, registry will not persist across restarts")
# Load existing vector DB definitions
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(client=client, collection_name=vector_db.identifier, kvstore=self.kvstore)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=idx,
inference_api=self.inference_api,
)
if self.kvstore is not None:
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored = await self.kvstore.values_in_range(start_key, end_key)
for raw in stored:
vector_db = VectorDB.model_validate_json(raw)
client = self._get_client()
idx = WeaviateIndex(
client=client,
collection_name=vector_db.identifier,
kvstore=self.kvstore,
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
vector_db=vector_db,
index=idx,
inference_api=self.inference_api,
)
# Load OpenAI vector stores metadata into cache
await self.initialize_openai_vector_stores()
# Load OpenAI vector stores metadata into cache
await self.initialize_openai_vector_stores()
async def shutdown(self) -> None:
for client in self.client_cache.values():
@ -187,11 +226,11 @@ class WeaviateVectorIOAdapter(
vector_db: VectorDB,
) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True)
# Create collection if it doesn't exist
if not client.collections.exists(vector_db.identifier):
if not client.collections.exists(sanitized_collection_name):
client.collections.create(
name=vector_db.identifier,
name=sanitized_collection_name,
vectorizer_config=wvc.config.Configure.Vectorizer.none(),
properties=[
wvc.config.Property(
@ -201,30 +240,41 @@ class WeaviateVectorIOAdapter(
],
)
self.cache[vector_db.identifier] = VectorDBWithIndex(
self.cache[sanitized_collection_name] = VectorDBWithIndex(
vector_db,
WeaviateIndex(client=client, collection_name=vector_db.identifier),
WeaviateIndex(client=client, collection_name=sanitized_collection_name),
self.inference_api,
)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
if vector_db_id in self.cache:
return self.cache[vector_db_id]
async def unregister_vector_db(self, vector_db_id: str) -> None:
client = self._get_client()
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
if sanitized_collection_name not in self.cache or client.collections.exists(sanitized_collection_name) is False:
log.warning(f"Vector DB {sanitized_collection_name} not found")
return
client.collections.delete(sanitized_collection_name)
await self.cache[sanitized_collection_name].index.delete()
del self.cache[sanitized_collection_name]
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
if sanitized_collection_name in self.cache:
return self.cache[sanitized_collection_name]
vector_db = await self.vector_db_store.get_vector_db(sanitized_collection_name)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
client = self._get_client()
if not client.collections.exists(vector_db.identifier):
raise ValueError(f"Collection with name `{vector_db.identifier}` not found")
raise ValueError(f"Collection with name `{sanitized_collection_name}` not found")
index = VectorDBWithIndex(
vector_db=vector_db,
index=WeaviateIndex(client=client, collection_name=vector_db.identifier),
index=WeaviateIndex(client=client, collection_name=sanitized_collection_name),
inference_api=self.inference_api,
)
self.cache[vector_db_id] = index
self.cache[sanitized_collection_name] = index
return index
async def insert_chunks(
@ -233,7 +283,8 @@ class WeaviateVectorIOAdapter(
chunks: list[Chunk],
ttl_seconds: int | None = None,
) -> None:
index = await self._get_and_cache_vector_db_index(vector_db_id)
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
@ -245,29 +296,17 @@ class WeaviateVectorIOAdapter(
query: InterleavedContent,
params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_db_index(vector_db_id)
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
# OpenAI Vector Stores File operations are not supported in Weaviate
async def _save_openai_vector_store_file(
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
raise NotImplementedError("OpenAI Vector Stores API is not supported in Weaviate")
sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True)
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
if not index:
raise ValueError(f"Vector DB {sanitized_collection_name} not found")
await index.delete(chunk_ids)

View file

@ -30,7 +30,7 @@ from llama_stack.providers.datatypes import Api
from llama_stack.providers.utils.inference.prompt_adapter import (
interleaved_content_as_str,
)
from llama_stack.providers.utils.vector_io.chunk_utils import generate_chunk_id
from llama_stack.providers.utils.vector_io.vector_utils import generate_chunk_id
log = logging.getLogger(__name__)

View file

@ -5,6 +5,7 @@
# the root directory of this source tree.
import hashlib
import re
import uuid
@ -19,3 +20,20 @@ def generate_chunk_id(document_id: str, chunk_text: str, chunk_window: str | Non
if chunk_window:
hash_input += f":{chunk_window}".encode()
return str(uuid.UUID(hashlib.md5(hash_input, usedforsecurity=False).hexdigest()))
def proper_case(s: str) -> str:
"""Convert a string to proper case (first letter uppercase, rest lowercase)."""
return s[0].upper() + s[1:].lower() if s else s
def sanitize_collection_name(name: str, weaviate_format=False) -> str:
"""
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
Any other characters are replaced with underscores.
"""
if not weaviate_format:
s = re.sub(r"[^a-zA-Z0-9_]", "_", name)
else:
s = proper_case(re.sub(r"[^a-zA-Z0-9]", "", name))
return s