Added tests + docs + CI for Elasticsearch

This commit is contained in:
Enrico Zimuel 2025-10-31 18:18:27 +01:00
parent f1c8a200c8
commit 12f8d96f48
No known key found for this signature in database
GPG key ID: 6CB203F6934A69F1
9 changed files with 209 additions and 115 deletions

View file

@ -146,6 +146,14 @@ jobs:
docker logs weaviate
exit 1
- name: Setup Elasticsearch
if: matrix.vector-io-provider == 'remote::elasticsearch'
id: setup-elasticsearch
run: |
curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly
source elastic-start-local/.env
echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT"
- name: Build Llama Stack
run: |
uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install
@ -170,6 +178,8 @@ jobs:
QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }}
ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }}
WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }}
ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }}
ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }}
run: |
uv run --no-sync \
pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \
@ -196,6 +206,11 @@ jobs:
run: |
docker logs qdrant > qdrant.log
- name: Write Elasticsearch logs to file
if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }}
run: |
docker logs es-local-dev > elasticsearch.log
- name: Upload all logs to artifacts
if: ${{ always() }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0

View file

@ -0,0 +1,104 @@
---
description: |
[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack.
It allows you to store and query vectors directly within an Elasticsearch database.
That means you're not limited to storing vectors in memory or in a separate service.
## Features
Elasticsearch supports:
- Store embeddings and their metadata
- Vector search
- Full-text search
- Fuzzy search
- Hybrid search
- Document storage
- Metadata filtering
- Inference service
- Machine Learning integrations
## Usage
To use Elasticsearch in your Llama Stack project, follow these steps:
1. Install the necessary dependencies.
2. Configure your Llama Stack project to use Elasticsearch.
3. Start storing and querying vectors.
## Installation
You can test Elasticsearch locally by running this script in the terminal:
```bash
curl -fsSL https://elastic.co/start-local | sh
```
Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud.
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
sidebar_label: Remote - Elasticsearch
title: remote::elasticsearch
---
# remote::elasticsearch
## Description
[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack.
It allows you to store and query vectors directly within an Elasticsearch database.
That means you're not limited to storing vectors in memory or in a separate service.
## Features
Elasticsearch supports:
- Store embeddings and their metadata
- Vector search
- Full-text search
- Fuzzy search
- Hybrid search
- Document storage
- Metadata filtering
- Inference service
- Machine Learning integrations
## Usage
To use Elasticsearch in your Llama Stack project, follow these steps:
1. Install the necessary dependencies.
2. Configure your Llama Stack project to use Elasticsearch.
3. Start storing and querying vectors.
## Installation
You can test Elasticsearch locally by running this script in the terminal:
```bash
curl -fsSL https://elastic.co/start-local | sh
```
Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud.
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
## Configuration
| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `api_key` | `str \| None` | No | | The API key for the Elasticsearch instance |
| `hosts` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance |
| `persistence` | `llama_stack.core.storage.datatypes.KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) |
## Sample Configuration
```yaml
hosts: ${env.ELASTICSEARCH_API_KEY:=None}
api_key: ${env.ELASTICSEARCH_URL:=localhost:9200}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
```

View file

@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type
@json_schema_type
class ElasticsearchVectorIOConfig(BaseModel):
elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None)
elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200")
api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None)
hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200")
persistence: KVStoreReference | None = Field(
description="Config for KV store backend (SQLite only for now)", default=None
)
@ -23,8 +23,8 @@ class ElasticsearchVectorIOConfig(BaseModel):
@classmethod
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
return {
"elasticsearch_api_key": None,
"elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}",
"hosts": "${env.ELASTICSEARCH_API_KEY:=None}",
"api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}",
"persistence": KVStoreReference(
backend="kv_default",
namespace="vector_io::elasticsearch",

View file

@ -4,12 +4,11 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from typing import Any
from numpy.typing import NDArray
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from numpy.typing import NDArray
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files
@ -18,13 +17,10 @@ from llama_stack.apis.vector_io import (
Chunk,
QueryChunksResponse,
VectorIO,
VectorStoreChunkingStrategy,
VectorStoreFileObject,
)
from llama_stack.apis.vector_stores import VectorStore
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
@ -36,6 +32,10 @@ log = get_logger(name=__name__, category="vector_io::elasticsearch")
# KV store prefixes for vector databases
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::"
class ElasticsearchIndex(EmbeddingIndex):
@ -61,39 +61,32 @@ class ElasticsearchIndex(EmbeddingIndex):
body={
"mappings": {
"properties": {
"vector": {
"type": "dense_vector",
"dims": len(embeddings[0])
},
"vector": {"type": "dense_vector", "dims": len(embeddings[0])},
"chunk_content": {"type": "object"},
}
}
}
},
)
actions = []
for chunk, embedding in zip(chunks, embeddings, strict=False):
actions.append({
actions.append(
{
"_op_type": "index",
"_index": self.collection_name,
"_id": chunk.chunk_id,
"_source": {
"vector": embedding,
"chunk_content": chunk.model_dump_json()
"_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()},
}
})
)
try:
successful_count, error_count = await async_bulk(
client=self.client,
actions=actions,
timeout='300s',
refresh=True,
raise_on_error=False,
stats_only=True
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True
)
if error_count > 0:
log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}")
log.warning(
f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}"
)
log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}")
except Exception as e:
@ -105,23 +98,16 @@ class ElasticsearchIndex(EmbeddingIndex):
actions = []
for chunk in chunks_for_deletion:
actions.append({
"_op_type": "delete",
"_index": self.collection_name,
"_id": chunk.chunk_id
})
actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id})
try:
successful_count, error_count = await async_bulk(
client=self.client,
actions=actions,
timeout='300s',
refresh=True,
raise_on_error=True,
stats_only=True
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True
)
if error_count > 0:
log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}")
log.warning(
f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}"
)
log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}")
except Exception as e:
@ -132,12 +118,12 @@ class ElasticsearchIndex(EmbeddingIndex):
"""Convert search results to QueryChunksResponse."""
chunks, scores = [], []
for result in results['hits']['hits']:
for result in results["hits"]["hits"]:
try:
chunk = Chunk(
content=result["_source"]["chunk_content"],
chunk_id=result["_id"],
embedding=result["_source"]["vector"]
stored_chunk_id=result["_id"],
embedding=result["_source"]["vector"],
)
except Exception:
log.exception("Failed to parse chunk")
@ -152,19 +138,11 @@ class ElasticsearchIndex(EmbeddingIndex):
"""Vector search using kNN."""
try:
results = (
await self.client.search(
results = await self.client.search(
index=self.collection_name,
query={
"knn": {
"field": "vector",
"query_vector": embedding.tolist(),
"k": k
}
},
query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
min_score=score_threshold,
limit=k
)
limit=k,
)
except Exception as e:
log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}")
@ -176,19 +154,11 @@ class ElasticsearchIndex(EmbeddingIndex):
"""Keyword search using match query."""
try:
results = (
await self.client.search(
results = await self.client.search(
index=self.collection_name,
query={
"match": {
"chunk_content": {
"query": query_string
}
}
},
query={"match": {"chunk_content": {"query": query_string}}},
min_score=score_threshold,
limit=k
)
limit=k,
)
except Exception as e:
log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}")
@ -205,7 +175,6 @@ class ElasticsearchIndex(EmbeddingIndex):
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
supported_retrievers = ["rrf", "linear"]
if reranker_type not in supported_retrievers:
raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}")
@ -213,24 +182,8 @@ class ElasticsearchIndex(EmbeddingIndex):
retriever = {
reranker_type: {
"retrievers": [
{
"retriever": {
"standard": {
"query": {
"match": {
"chunk_content": query_string
}
}
}
}
},
{
"knn": {
"field": "vector",
"query_vector": embedding.tolist(),
"k": k
}
}
{"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}},
{"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
]
}
}
@ -252,17 +205,14 @@ class ElasticsearchIndex(EmbeddingIndex):
retrievers_params = reranker_params.get("retrievers")
if retrievers_params is not None:
for i in range(0, len(retriever["linear"]["retrievers"])):
retr_type=retriever["linear"]["retrievers"][i]["retriever"].key()
retr_type = list(retriever["linear"]["retrievers"][i].keys())[0]
retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type])
del reranker_params["retrievers"]
retriever["linear"].update(reranker_params)
try:
results = await self.client.search(
index=self.collection_name,
size=k,
retriever=retriever,
min_score=score_threshold
index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold
)
except Exception as e:
log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}")

View file

@ -825,11 +825,11 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi
Please refer to the remote provider documentation.
""",
),
InlineProviderSpec(
RemoteProviderSpec(
api=Api.vector_io,
adapter_type="elasticsearch",
provider_type="remote::elasticsearch",
pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS,
pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.elasticsearch",
config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig",
api_dependencies=[Api.inference],
@ -871,6 +871,7 @@ Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overvie
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""",
)
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
""",
),
]

View file

@ -371,18 +371,7 @@ def vector_provider_wrapper(func):
inference_mode = os.environ.get("LLAMA_STACK_TEST_INFERENCE_MODE")
if inference_mode == "live":
# For live tests, try all providers (they'll skip if not available)
all_providers = [
"faiss",
"sqlite-vec",
"milvus",
"chromadb",
"pgvector",
"weaviate",
"qdrant",
]
else:
# For CI tests (replay/record), only use providers that are available in ci-tests environment
all_providers = ["faiss", "sqlite-vec"]
all_providers = ["faiss", "sqlite-vec", "milvus", "chromadb", "pgvector", "weaviate", "qdrant", "elasticsearch"]
return pytest.mark.parametrize("vector_io_provider_id", all_providers)(wrapper)

View file

@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models):
"remote::pgvector",
"remote::qdrant",
"remote::weaviate",
"remote::elasticsearch",
]:
return
@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::pgvector",
"remote::qdrant",
"remote::weaviate",
"remote::elasticsearch",
],
"keyword": [
"inline::milvus",
@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::milvus",
"remote::pgvector",
"remote::weaviate",
"remote::elasticsearch",
],
"hybrid": [
"inline::milvus",
@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::milvus",
"remote::pgvector",
"remote::weaviate",
"remote::elasticsearch",
],
}
supported_providers = search_mode_support.get(search_mode, [])

View file

@ -164,6 +164,7 @@ def test_insert_chunks_with_precomputed_embeddings(
"inline::milvus": {"score_threshold": -1.0},
"inline::qdrant": {"score_threshold": -1.0},
"remote::qdrant": {"score_threshold": -1.0},
"remote::elasticsearch": {"score_threshold": -1.0},
}
vector_store_name = "test_precomputed_embeddings_db"
register_response = client_with_empty_registry.vector_stores.create(
@ -214,6 +215,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb(
"inline::milvus": {"score_threshold": 0.0},
"remote::qdrant": {"score_threshold": 0.0},
"inline::qdrant": {"score_threshold": 0.0},
"remote::elasticsearch": {"score_threshold": 0.0},
}
vector_store_name = "test_precomputed_embeddings_db"
register_response = client_with_empty_registry.vector_stores.create(

31
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.12"
resolution-markers = [
"(python_full_version >= '3.13' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.13' and sys_platform != 'darwin' and sys_platform != 'linux')",
@ -979,6 +979,33 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" },
]
[[package]]
name = "elastic-transport"
version = "8.17.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" },
]
[[package]]
name = "elasticsearch"
version = "8.19.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "elastic-transport" },
{ name = "python-dateutil" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" },
]
[[package]]
name = "eval-type-backport"
version = "0.2.2"
@ -2021,6 +2048,7 @@ test = [
{ name = "chardet" },
{ name = "chromadb" },
{ name = "datasets" },
{ name = "elasticsearch" },
{ name = "mcp" },
{ name = "milvus-lite" },
{ name = "psycopg2-binary" },
@ -2167,6 +2195,7 @@ test = [
{ name = "chardet" },
{ name = "chromadb", specifier = ">=1.0.15" },
{ name = "datasets", specifier = ">=4.0.0" },
{ name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" },
{ name = "mcp" },
{ name = "milvus-lite", specifier = ">=2.5.0" },
{ name = "psycopg2-binary", specifier = ">=2.9.0" },