This commit is contained in:
Enrico Zimuel 2025-12-03 01:04:14 +00:00 committed by GitHub
commit ae3e825640
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 654 additions and 5 deletions

View file

@ -31,7 +31,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant"] vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant", "remote::elasticsearch"]
python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }}
fail-fast: false # we want to run all tests regardless of failure fail-fast: false # we want to run all tests regardless of failure
@ -146,6 +146,14 @@ jobs:
docker logs weaviate docker logs weaviate
exit 1 exit 1
- name: Setup Elasticsearch
if: matrix.vector-io-provider == 'remote::elasticsearch'
id: setup-elasticsearch
run: |
curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly
source elastic-start-local/.env
echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT"
- name: Build Llama Stack - name: Build Llama Stack
run: | run: |
uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install
@ -170,6 +178,8 @@ jobs:
QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }} QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }}
ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }}
WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }}
ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }}
ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && steps.setup-elasticsearch.outputs.elasticsearch-api-key || '' }}
run: | run: |
uv run --no-sync \ uv run --no-sync \
pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \
@ -196,6 +206,11 @@ jobs:
run: | run: |
docker logs qdrant > qdrant.log docker logs qdrant > qdrant.log
- name: Write Elasticsearch logs to file
if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }}
run: |
docker logs es-local-dev > elasticsearch.log
- name: Upload all logs to artifacts - name: Upload all logs to artifacts
if: ${{ always() }} if: ${{ always() }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0

View file

@ -9,7 +9,7 @@ sidebar_position: 2
The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include:
- LLM inference providers (e.g., Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, etc.), - LLM inference providers (e.g., Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, etc.),
- Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, etc.), - Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, Elasticsearch, etc.),
- Safety providers (e.g., Meta's Llama Guard, AWS Bedrock Guardrails, etc.) - Safety providers (e.g., Meta's Llama Guard, AWS Bedrock Guardrails, etc.)
Providers come in two flavors: Providers come in two flavors:

View file

@ -54,7 +54,7 @@ Llama Stack consists of a server (with multiple pluggable API providers) and Cli
Llama Stack provides adapters for popular providers across all API categories: Llama Stack provides adapters for popular providers across all API categories:
- **Inference**: Meta Reference, Ollama, Fireworks, Together, NVIDIA, vLLM, AWS Bedrock, OpenAI, Anthropic, and more - **Inference**: Meta Reference, Ollama, Fireworks, Together, NVIDIA, vLLM, AWS Bedrock, OpenAI, Anthropic, and more
- **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, and others - **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, Elasticsearch and others
- **Safety**: Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock - **Safety**: Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock
- **Training & Evaluation**: HuggingFace, TorchTune, NVIDIA NEMO - **Training & Evaluation**: HuggingFace, TorchTune, NVIDIA NEMO

View file

@ -9,7 +9,7 @@ sidebar_position: 1
The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include:
- LLM inference providers (e.g., Meta Reference, Ollama, Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, OpenAI, Anthropic, Gemini, WatsonX, etc.), - LLM inference providers (e.g., Meta Reference, Ollama, Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, OpenAI, Anthropic, Gemini, WatsonX, etc.),
- Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, etc.), - Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, Elasticsearch, etc.),
- Safety providers (e.g., Meta's Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock Guardrails, etc.), - Safety providers (e.g., Meta's Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock Guardrails, etc.),
- Tool Runtime providers (e.g., RAG Runtime, Brave Search, etc.) - Tool Runtime providers (e.g., RAG Runtime, Brave Search, etc.)

View file

@ -0,0 +1,104 @@
---
description: |
[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack.
It allows you to store and query vectors directly within an Elasticsearch database.
That means you're not limited to storing vectors in memory or in a separate service.
## Features
Elasticsearch supports:
- Store embeddings and their metadata
- Vector search
- Full-text search
- Fuzzy search
- Hybrid search
- Document storage
- Metadata filtering
- Inference service
- Machine Learning integrations
## Usage
To use Elasticsearch in your Llama Stack project, follow these steps:
1. Install the necessary dependencies.
2. Configure your Llama Stack project to use Elasticsearch.
3. Start storing and querying vectors.
## Installation
You can test Elasticsearch locally by running this script in the terminal:
```bash
curl -fsSL https://elastic.co/start-local | sh
```
Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud.
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
sidebar_label: Remote - Elasticsearch
title: remote::elasticsearch
---
# remote::elasticsearch
## Description
[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack.
It allows you to store and query vectors directly within an Elasticsearch database.
That means you're not limited to storing vectors in memory or in a separate service.
## Features
Elasticsearch supports:
- Store embeddings and their metadata
- Vector search
- Full-text search
- Fuzzy search
- Hybrid search
- Document storage
- Metadata filtering
- Inference service
- Machine Learning integrations
## Usage
To use Elasticsearch in your Llama Stack project, follow these steps:
1. Install the necessary dependencies.
2. Configure your Llama Stack project to use Elasticsearch.
3. Start storing and querying vectors.
## Installation
You can test Elasticsearch locally by running this script in the terminal:
```bash
curl -fsSL https://elastic.co/start-local | sh
```
Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud.
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
## Configuration
| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `elasticsearch_api_key` | `str \| None` | No | | The API key for the Elasticsearch instance |
| `elasticsearch_url` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance |
| `persistence` | `KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) |
## Sample Configuration
```yaml
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
```

View file

@ -160,7 +160,8 @@ const sidebars: SidebarsConfig = {
'providers/vector_io/remote_milvus', 'providers/vector_io/remote_milvus',
'providers/vector_io/remote_pgvector', 'providers/vector_io/remote_pgvector',
'providers/vector_io/remote_qdrant', 'providers/vector_io/remote_qdrant',
'providers/vector_io/remote_weaviate' 'providers/vector_io/remote_weaviate',
'providers/vector_io/remote_elasticsearch'
], ],
}, },
{ {

View file

@ -150,6 +150,7 @@ test = [
"pymilvus>=2.6.1", "pymilvus>=2.6.1",
"milvus-lite>=2.5.0", "milvus-lite>=2.5.0",
"weaviate-client>=4.16.4", "weaviate-client>=4.16.4",
"elasticsearch>=8.16.0, <9.0.0"
] ]
docs = [ docs = [
"setuptools", "setuptools",
@ -332,6 +333,7 @@ exclude = [
"^src/llama_stack/providers/remote/vector_io/qdrant/", "^src/llama_stack/providers/remote/vector_io/qdrant/",
"^src/llama_stack/providers/remote/vector_io/sample/", "^src/llama_stack/providers/remote/vector_io/sample/",
"^src/llama_stack/providers/remote/vector_io/weaviate/", "^src/llama_stack/providers/remote/vector_io/weaviate/",
"^src/llama_stack/providers/remote/vector_io/elasticsearch/",
"^src/llama_stack/providers/utils/bedrock/client\\.py$", "^src/llama_stack/providers/utils/bedrock/client\\.py$",
"^src/llama_stack/providers/utils/bedrock/refreshable_boto_session\\.py$", "^src/llama_stack/providers/utils/bedrock/refreshable_boto_session\\.py$",
"^src/llama_stack/providers/utils/inference/embedding_mixin\\.py$", "^src/llama_stack/providers/utils/inference/embedding_mixin\\.py$",

View file

@ -27,6 +27,7 @@ distribution_spec:
- provider_type: remote::pgvector - provider_type: remote::pgvector
- provider_type: remote::qdrant - provider_type: remote::qdrant
- provider_type: remote::weaviate - provider_type: remote::weaviate
- provider_type: remote::elasticsearch
files: files:
- provider_type: inline::localfs - provider_type: inline::localfs
safety: safety:

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -28,6 +28,7 @@ distribution_spec:
- provider_type: remote::pgvector - provider_type: remote::pgvector
- provider_type: remote::qdrant - provider_type: remote::qdrant
- provider_type: remote::weaviate - provider_type: remote::weaviate
- provider_type: remote::elasticsearch
files: files:
- provider_type: inline::localfs - provider_type: inline::localfs
safety: safety:

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -28,6 +28,7 @@ distribution_spec:
- provider_type: remote::pgvector - provider_type: remote::pgvector
- provider_type: remote::qdrant - provider_type: remote::qdrant
- provider_type: remote::weaviate - provider_type: remote::weaviate
- provider_type: remote::elasticsearch
files: files:
- provider_type: inline::localfs - provider_type: inline::localfs
safety: safety:

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -145,6 +145,14 @@ providers:
persistence: persistence:
namespace: vector_io::weaviate namespace: vector_io::weaviate
backend: kv_default backend: kv_default
- provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch}
provider_type: remote::elasticsearch
config:
elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200}
elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=}
persistence:
namespace: vector_io::elasticsearch
backend: kv_default
files: files:
- provider_id: meta-reference-files - provider_id: meta-reference-files
provider_type: inline::localfs provider_type: inline::localfs

View file

@ -32,6 +32,7 @@ from llama_stack.providers.inline.vector_io.sqlite_vec.config import (
) )
from llama_stack.providers.registry.inference import available_providers from llama_stack.providers.registry.inference import available_providers
from llama_stack.providers.remote.vector_io.chroma.config import ChromaVectorIOConfig from llama_stack.providers.remote.vector_io.chroma.config import ChromaVectorIOConfig
from llama_stack.providers.remote.vector_io.elasticsearch.config import ElasticsearchVectorIOConfig
from llama_stack.providers.remote.vector_io.pgvector.config import ( from llama_stack.providers.remote.vector_io.pgvector.config import (
PGVectorVectorIOConfig, PGVectorVectorIOConfig,
) )
@ -121,6 +122,7 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
BuildProvider(provider_type="remote::pgvector"), BuildProvider(provider_type="remote::pgvector"),
BuildProvider(provider_type="remote::qdrant"), BuildProvider(provider_type="remote::qdrant"),
BuildProvider(provider_type="remote::weaviate"), BuildProvider(provider_type="remote::weaviate"),
BuildProvider(provider_type="remote::elasticsearch"),
], ],
"files": [BuildProvider(provider_type="inline::localfs")], "files": [BuildProvider(provider_type="inline::localfs")],
"safety": [ "safety": [
@ -237,6 +239,15 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
cluster_url="${env.WEAVIATE_CLUSTER_URL:=}", cluster_url="${env.WEAVIATE_CLUSTER_URL:=}",
), ),
), ),
Provider(
provider_id="${env.ELASTICSEARCH_URL:+elasticsearch}",
provider_type="remote::elasticsearch",
config=ElasticsearchVectorIOConfig.sample_run_config(
f"~/.llama/distributions/{name}",
elasticsearch_url="${env.ELASTICSEARCH_URL:=localhost:9200}",
elasticsearch_api_key="${env.ELASTICSEARCH_API_KEY:=}",
),
),
], ],
"files": [files_provider], "files": [files_provider],
} }

View file

@ -823,6 +823,55 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi
optional_api_dependencies=[Api.files, Api.models], optional_api_dependencies=[Api.files, Api.models],
description=""" description="""
Please refer to the remote provider documentation. Please refer to the remote provider documentation.
""",
),
RemoteProviderSpec(
api=Api.vector_io,
adapter_type="elasticsearch",
provider_type="remote::elasticsearch",
pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS,
module="llama_stack.providers.remote.vector_io.elasticsearch",
config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig",
api_dependencies=[Api.inference],
optional_api_dependencies=[Api.files, Api.models],
description="""
[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack.
It allows you to store and query vectors directly within an Elasticsearch database.
That means you're not limited to storing vectors in memory or in a separate service.
## Features
Elasticsearch supports:
- Store embeddings and their metadata
- Vector search
- Full-text search
- Fuzzy search
- Hybrid search
- Document storage
- Metadata filtering
- Inference service
- Machine Learning integrations
## Usage
To use Elasticsearch in your Llama Stack project, follow these steps:
1. Install the necessary dependencies.
2. Configure your Llama Stack project to use Elasticsearch.
3. Start storing and querying vectors.
## Installation
You can test Elasticsearch locally by running this script in the terminal:
```bash
curl -fsSL https://elastic.co/start-local | sh
```
Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud.
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
## Documentation
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
""", """,
), ),
] ]

View file

@ -0,0 +1,17 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack_api import Api, ProviderSpec
from .config import ElasticsearchVectorIOConfig
async def get_adapter_impl(config: ElasticsearchVectorIOConfig, deps: dict[Api, ProviderSpec]):
from .elasticsearch import ElasticsearchVectorIOAdapter
impl = ElasticsearchVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files))
await impl.initialize()
return impl

View file

@ -0,0 +1,32 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any
from pydantic import BaseModel, Field
from llama_stack.core.storage.datatypes import KVStoreReference
from llama_stack_api import json_schema_type
@json_schema_type
class ElasticsearchVectorIOConfig(BaseModel):
elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None)
elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200")
persistence: KVStoreReference | None = Field(
description="Config for KV store backend (SQLite only for now)", default=None
)
@classmethod
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
return {
"elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}",
"elasticsearch_api_key": "${env.ELASTICSEARCH_API_KEY:=}",
"persistence": KVStoreReference(
backend="kv_default",
namespace="vector_io::elasticsearch",
).model_dump(exclude_none=True),
}

View file

@ -0,0 +1,331 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from numpy.typing import NDArray
from llama_stack.core.storage.kvstore import kvstore_impl
from llama_stack.log import get_logger
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
from llama_stack_api import (
Chunk,
Files,
Inference,
InterleavedContent,
QueryChunksResponse,
VectorIO,
VectorStore,
VectorStoreNotFoundError,
VectorStoresProtocolPrivate,
)
from .config import ElasticsearchVectorIOConfig
log = get_logger(name=__name__, category="vector_io::elasticsearch")
# KV store prefixes for vector databases
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::"
VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::"
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::"
class ElasticsearchIndex(EmbeddingIndex):
def __init__(self, client: AsyncElasticsearch, collection_name: str):
self.client = client
self.collection_name = collection_name
async def initialize(self) -> None:
# Elasticsearch collections (indexes) are created on-demand in add_chunks
# If the index does not exist, it will be created in add_chunks.
pass
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
"""Adds chunks and their embeddings to the Elasticsearch index."""
assert len(chunks) == len(embeddings), (
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
)
if not await self.client.indices.exists(self.collection_name):
await self.client.indices.create(
index=self.collection_name,
body={
"mappings": {
"properties": {
"vector": {"type": "dense_vector", "dims": len(embeddings[0])},
"chunk_content": {"type": "object"},
}
}
},
)
actions = []
for chunk, embedding in zip(chunks, embeddings, strict=False):
actions.append(
{
"_op_type": "index",
"_index": self.collection_name,
"_id": chunk.chunk_id,
"_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()},
}
)
try:
successful_count, error_count = await async_bulk(
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True
)
if error_count > 0:
log.warning(
f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}"
)
log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}")
except Exception as e:
log.error(f"Error adding chunks to Elasticsearch index {self.collection_name}: {e}")
raise
async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Remove a chunk from the Elasticsearch index."""
actions = []
for chunk in chunks_for_deletion:
actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id})
try:
successful_count, error_count = await async_bulk(
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True
)
if error_count > 0:
log.warning(
f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}"
)
log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}")
except Exception as e:
log.error(f"Error deleting chunks from Elasticsearch index {self.collection_name}: {e}")
raise
async def _results_to_chunks(self, results: dict) -> QueryChunksResponse:
"""Convert search results to QueryChunksResponse."""
chunks, scores = [], []
for result in results["hits"]["hits"]:
try:
chunk = Chunk(
content=result["_source"]["chunk_content"],
stored_chunk_id=result["_id"],
embedding=result["_source"]["vector"],
)
except Exception:
log.exception("Failed to parse chunk")
continue
chunks.append(chunk)
scores.append(result["_score"])
return QueryChunksResponse(chunks=chunks, scores=scores)
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
"""Vector search using kNN."""
try:
results = await self.client.search(
index=self.collection_name,
query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
min_score=score_threshold,
limit=k,
)
except Exception as e:
log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}")
raise
return await self._results_to_chunks(results)
async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse:
"""Keyword search using match query."""
try:
results = await self.client.search(
index=self.collection_name,
query={"match": {"chunk_content": {"query": query_string}}},
min_score=score_threshold,
limit=k,
)
except Exception as e:
log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}")
raise
return await self._results_to_chunks(results)
async def query_hybrid(
self,
embedding: NDArray,
query_string: str,
k: int,
score_threshold: float,
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
supported_retrievers = ["rrf", "linear"]
if reranker_type not in supported_retrievers:
raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}")
retriever = {
reranker_type: {
"retrievers": [
{"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}},
{"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
]
}
}
# Add reranker parameters if provided for RRF (e.g. rank_constant)
# see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/rrf-retriever
if reranker_type == "rrf" and reranker_params is not None:
retriever["rrf"].update(reranker_params)
# Add reranker parameters if provided for Linear (e.g. weights)
# see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/linear-retriever
# Since the weights are per retriever, we need to update them separately, using the following syntax
# reranker_params = {
# "retrievers": {
# "standard": {"weight": 0.7},
# "knn": {"weight": 0.3}
# }
# }
elif reranker_type == "linear" and reranker_params is not None:
retrievers_params = reranker_params.get("retrievers")
if retrievers_params is not None:
for i in range(0, len(retriever["linear"]["retrievers"])):
retr_type = list(retriever["linear"]["retrievers"][i].keys())[0]
retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type])
del reranker_params["retrievers"]
retriever["linear"].update(reranker_params)
try:
results = await self.client.search(
index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold
)
except Exception as e:
log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}")
raise
return await self._results_to_chunks(results)
async def delete(self):
"""Delete the entire Elasticsearch index with collection_name."""
try:
await self.client.delete(index=self.collection_name)
except Exception as e:
log.error(f"Error deleting Elasticsearch index {self.collection_name}: {e}")
raise
class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
def __init__(
self,
config: ElasticsearchVectorIOConfig,
inference_api: Inference,
files_api: Files | None = None,
) -> None:
super().__init__(files_api=files_api, kvstore=None)
self.config = config
self.client: AsyncElasticsearch = None
self.cache = {}
self.inference_api = inference_api
self.vector_store_table = None
async def initialize(self) -> None:
self.client = AsyncElasticsearch(hosts=self.config.elasticsearch_url, api_key=self.config.elasticsearch_api_key)
self.kvstore = await kvstore_impl(self.config.persistence)
start_key = VECTOR_DBS_PREFIX
end_key = f"{VECTOR_DBS_PREFIX}\xff"
stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key)
for vector_store_data in stored_vector_stores:
vector_store = VectorStore.model_validate_json(vector_store_data)
index = VectorStoreWithIndex(
vector_store, ElasticsearchIndex(self.client, vector_store.identifier), self.inference_api
)
self.cache[vector_store.identifier] = index
self.openai_vector_stores = await self._load_openai_vector_stores()
async def shutdown(self) -> None:
await self.client.close()
# Clean up mixin resources (file batch tasks)
await super().shutdown()
async def register_vector_store(self, vector_store: VectorStore) -> None:
assert self.kvstore is not None
key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}"
await self.kvstore.set(key=key, value=vector_store.model_dump_json())
index = VectorStoreWithIndex(
vector_store=vector_store,
index=ElasticsearchIndex(self.client, vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_store.identifier] = index
async def unregister_vector_store(self, vector_store_id: str) -> None:
if vector_store_id in self.cache:
await self.cache[vector_store_id].index.delete()
del self.cache[vector_store_id]
assert self.kvstore is not None
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}")
async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None:
if vector_store_id in self.cache:
return self.cache[vector_store_id]
if self.vector_store_table is None:
raise ValueError(f"Vector DB not found {vector_store_id}")
vector_store = await self.vector_store_table.get_vector_store(vector_store_id)
if not vector_store:
raise VectorStoreNotFoundError(vector_store_id)
index = VectorStoreWithIndex(
vector_store=vector_store,
index=ElasticsearchIndex(client=self.client, collection_name=vector_store.identifier),
inference_api=self.inference_api,
)
self.cache[vector_store_id] = index
return index
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
await index.insert_chunks(chunks)
async def query_chunks(
self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None
) -> QueryChunksResponse:
index = await self._get_and_cache_vector_store_index(vector_db_id)
if not index:
raise VectorStoreNotFoundError(vector_db_id)
return await index.query_chunks(query, params)
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
"""Delete chunks from an Elasticsearch vector store."""
index = await self._get_and_cache_vector_store_index(store_id)
if not index:
raise ValueError(f"Vector DB {store_id} not found")
await index.index.delete_chunks(chunks_for_deletion)

View file

@ -379,6 +379,7 @@ def vector_provider_wrapper(func):
"pgvector", "pgvector",
"weaviate", "weaviate",
"qdrant", "qdrant",
"elasticsearch",
] ]
else: else:
# For CI tests (replay/record), only use providers that are available in ci-tests environment # For CI tests (replay/record), only use providers that are available in ci-tests environment

View file

@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models):
"remote::pgvector", "remote::pgvector",
"remote::qdrant", "remote::qdrant",
"remote::weaviate", "remote::weaviate",
"remote::elasticsearch",
]: ]:
return return
@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::pgvector", "remote::pgvector",
"remote::qdrant", "remote::qdrant",
"remote::weaviate", "remote::weaviate",
"remote::elasticsearch",
], ],
"keyword": [ "keyword": [
"inline::milvus", "inline::milvus",
@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::milvus", "remote::milvus",
"remote::pgvector", "remote::pgvector",
"remote::weaviate", "remote::weaviate",
"remote::elasticsearch",
], ],
"hybrid": [ "hybrid": [
"inline::milvus", "inline::milvus",
@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
"remote::milvus", "remote::milvus",
"remote::pgvector", "remote::pgvector",
"remote::weaviate", "remote::weaviate",
"remote::elasticsearch",
], ],
} }
supported_providers = search_mode_support.get(search_mode, []) supported_providers = search_mode_support.get(search_mode, [])

View file

@ -164,6 +164,7 @@ def test_insert_chunks_with_precomputed_embeddings(
"inline::milvus": {"score_threshold": -1.0}, "inline::milvus": {"score_threshold": -1.0},
"inline::qdrant": {"score_threshold": -1.0}, "inline::qdrant": {"score_threshold": -1.0},
"remote::qdrant": {"score_threshold": -1.0}, "remote::qdrant": {"score_threshold": -1.0},
"remote::elasticsearch": {"score_threshold": -1.0},
} }
vector_store_name = "test_precomputed_embeddings_db" vector_store_name = "test_precomputed_embeddings_db"
register_response = client_with_empty_registry.vector_stores.create( register_response = client_with_empty_registry.vector_stores.create(
@ -214,6 +215,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb(
"inline::milvus": {"score_threshold": 0.0}, "inline::milvus": {"score_threshold": 0.0},
"remote::qdrant": {"score_threshold": 0.0}, "remote::qdrant": {"score_threshold": 0.0},
"inline::qdrant": {"score_threshold": 0.0}, "inline::qdrant": {"score_threshold": 0.0},
"remote::elasticsearch": {"score_threshold": 0.0},
} }
vector_store_name = "test_precomputed_embeddings_db" vector_store_name = "test_precomputed_embeddings_db"
register_response = client_with_empty_registry.vector_stores.create( register_response = client_with_empty_registry.vector_stores.create(

29
uv.lock generated
View file

@ -988,6 +988,33 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" },
] ]
[[package]]
name = "elastic-transport"
version = "8.17.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" },
]
[[package]]
name = "elasticsearch"
version = "8.19.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "elastic-transport" },
{ name = "python-dateutil" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" },
]
[[package]] [[package]]
name = "eval-type-backport" name = "eval-type-backport"
version = "0.2.2" version = "0.2.2"
@ -2077,6 +2104,7 @@ test = [
{ name = "chardet" }, { name = "chardet" },
{ name = "chromadb" }, { name = "chromadb" },
{ name = "datasets" }, { name = "datasets" },
{ name = "elasticsearch" },
{ name = "mcp" }, { name = "mcp" },
{ name = "milvus-lite" }, { name = "milvus-lite" },
{ name = "psycopg2-binary" }, { name = "psycopg2-binary" },
@ -2223,6 +2251,7 @@ test = [
{ name = "chardet" }, { name = "chardet" },
{ name = "chromadb", specifier = ">=1.0.15" }, { name = "chromadb", specifier = ">=1.0.15" },
{ name = "datasets", specifier = ">=4.0.0" }, { name = "datasets", specifier = ">=4.0.0" },
{ name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" },
{ name = "mcp" }, { name = "mcp" },
{ name = "milvus-lite", specifier = ">=2.5.0" }, { name = "milvus-lite", specifier = ">=2.5.0" },
{ name = "psycopg2-binary", specifier = ">=2.9.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" },