diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index fc6ac0600..febffb21b 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -31,7 +31,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant"] + vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant", "remote::elasticsearch"] python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} fail-fast: false # we want to run all tests regardless of failure @@ -146,6 +146,14 @@ jobs: docker logs weaviate exit 1 + - name: Setup Elasticsearch + if: matrix.vector-io-provider == 'remote::elasticsearch' + id: setup-elasticsearch + run: | + curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly + source elastic-start-local/.env + echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT" + - name: Build Llama Stack run: | uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install @@ -170,6 +178,8 @@ jobs: QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }} ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} + ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && steps.setup-elasticsearch.outputs.elasticsearch-api-key || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ @@ -196,6 +206,11 @@ jobs: run: | docker logs qdrant > qdrant.log + - name: Write Elasticsearch logs to file + if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }} + run: | + docker logs es-local-dev > elasticsearch.log + - name: Upload all logs to artifacts if: ${{ always() }} uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 diff --git a/docs/docs/concepts/apis/api_providers.mdx b/docs/docs/concepts/apis/api_providers.mdx index 5f0fe2ac7..fd2af3854 100644 --- a/docs/docs/concepts/apis/api_providers.mdx +++ b/docs/docs/concepts/apis/api_providers.mdx @@ -9,7 +9,7 @@ sidebar_position: 2 The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: - LLM inference providers (e.g., Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, etc.), -- Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, etc.), +- Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, Elasticsearch, etc.), - Safety providers (e.g., Meta's Llama Guard, AWS Bedrock Guardrails, etc.) Providers come in two flavors: diff --git a/docs/docs/index.mdx b/docs/docs/index.mdx index 8c17283f9..41cbd79c6 100644 --- a/docs/docs/index.mdx +++ b/docs/docs/index.mdx @@ -54,7 +54,7 @@ Llama Stack consists of a server (with multiple pluggable API providers) and Cli Llama Stack provides adapters for popular providers across all API categories: - **Inference**: Meta Reference, Ollama, Fireworks, Together, NVIDIA, vLLM, AWS Bedrock, OpenAI, Anthropic, and more -- **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, and others +- **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, Elasticsearch and others - **Safety**: Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock - **Training & Evaluation**: HuggingFace, TorchTune, NVIDIA NEMO diff --git a/docs/docs/providers/index.mdx b/docs/docs/providers/index.mdx index 5c81a57ed..b8484d655 100644 --- a/docs/docs/providers/index.mdx +++ b/docs/docs/providers/index.mdx @@ -9,7 +9,7 @@ sidebar_position: 1 The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: - LLM inference providers (e.g., Meta Reference, Ollama, Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, OpenAI, Anthropic, Gemini, WatsonX, etc.), -- Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, etc.), +- Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, Elasticsearch, etc.), - Safety providers (e.g., Meta's Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock Guardrails, etc.), - Tool Runtime providers (e.g., RAG Runtime, Brave Search, etc.) diff --git a/docs/docs/providers/vector_io/remote_elasticsearch.mdx b/docs/docs/providers/vector_io/remote_elasticsearch.mdx new file mode 100644 index 000000000..5deed1a27 --- /dev/null +++ b/docs/docs/providers/vector_io/remote_elasticsearch.mdx @@ -0,0 +1,104 @@ +--- +description: | + [Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. + It allows you to store and query vectors directly within an Elasticsearch database. + That means you're not limited to storing vectors in memory or in a separate service. + + ## Features + Elasticsearch supports: + - Store embeddings and their metadata + - Vector search + - Full-text search + - Fuzzy search + - Hybrid search + - Document storage + - Metadata filtering + - Inference service + - Machine Learning integrations + + ## Usage + + To use Elasticsearch in your Llama Stack project, follow these steps: + + 1. Install the necessary dependencies. + 2. Configure your Llama Stack project to use Elasticsearch. + 3. Start storing and querying vectors. + + ## Installation + + You can test Elasticsearch locally by running this script in the terminal: + + ```bash + curl -fsSL https://elastic.co/start-local | sh + ``` + + Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. + For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + + ## Documentation + See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +sidebar_label: Remote - Elasticsearch +title: remote::elasticsearch +--- + +# remote::elasticsearch + +## Description + + +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. + + +## Configuration + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `elasticsearch_api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | +| `elasticsearch_url` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | +| `persistence` | `KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | + +## Sample Configuration + +```yaml +elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} +elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} +persistence: + namespace: vector_io::elasticsearch + backend: kv_default +``` diff --git a/docs/sidebars.ts b/docs/sidebars.ts index 7b4ac5ac8..0ab2c2698 100644 --- a/docs/sidebars.ts +++ b/docs/sidebars.ts @@ -160,7 +160,8 @@ const sidebars: SidebarsConfig = { 'providers/vector_io/remote_milvus', 'providers/vector_io/remote_pgvector', 'providers/vector_io/remote_qdrant', - 'providers/vector_io/remote_weaviate' + 'providers/vector_io/remote_weaviate', + 'providers/vector_io/remote_elasticsearch' ], }, { diff --git a/pyproject.toml b/pyproject.toml index 3e16dc08f..0d3b0c5ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ test = [ "pymilvus>=2.6.1", "milvus-lite>=2.5.0", "weaviate-client>=4.16.4", + "elasticsearch>=8.16.0, <9.0.0" ] docs = [ "setuptools", @@ -332,6 +333,7 @@ exclude = [ "^src/llama_stack/providers/remote/vector_io/qdrant/", "^src/llama_stack/providers/remote/vector_io/sample/", "^src/llama_stack/providers/remote/vector_io/weaviate/", + "^src/llama_stack/providers/remote/vector_io/elasticsearch/", "^src/llama_stack/providers/utils/bedrock/client\\.py$", "^src/llama_stack/providers/utils/bedrock/refreshable_boto_session\\.py$", "^src/llama_stack/providers/utils/inference/embedding_mixin\\.py$", diff --git a/src/llama_stack/distributions/ci-tests/build.yaml b/src/llama_stack/distributions/ci-tests/build.yaml index f29ac7712..4305cc9b1 100644 --- a/src/llama_stack/distributions/ci-tests/build.yaml +++ b/src/llama_stack/distributions/ci-tests/build.yaml @@ -27,6 +27,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml index 8414dcae5..fda5448e3 100644 --- a/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/ci-tests/run.yaml b/src/llama_stack/distributions/ci-tests/run.yaml index e83fc7fb5..2412710a7 100644 --- a/src/llama_stack/distributions/ci-tests/run.yaml +++ b/src/llama_stack/distributions/ci-tests/run.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter-gpu/build.yaml b/src/llama_stack/distributions/starter-gpu/build.yaml index 10cbb1389..7b4465206 100644 --- a/src/llama_stack/distributions/starter-gpu/build.yaml +++ b/src/llama_stack/distributions/starter-gpu/build.yaml @@ -28,6 +28,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml index 0662986f1..fca077efa 100644 --- a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter-gpu/run.yaml b/src/llama_stack/distributions/starter-gpu/run.yaml index 9ef5b3f6d..75046887d 100644 --- a/src/llama_stack/distributions/starter-gpu/run.yaml +++ b/src/llama_stack/distributions/starter-gpu/run.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/build.yaml b/src/llama_stack/distributions/starter/build.yaml index acd51f773..35566be15 100644 --- a/src/llama_stack/distributions/starter/build.yaml +++ b/src/llama_stack/distributions/starter/build.yaml @@ -28,6 +28,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml index 1da4f0da7..dcf2cc909 100644 --- a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/run.yaml b/src/llama_stack/distributions/starter/run.yaml index 3e6cde13a..bb95efe70 100644 --- a/src/llama_stack/distributions/starter/run.yaml +++ b/src/llama_stack/distributions/starter/run.yaml @@ -145,6 +145,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/starter.py b/src/llama_stack/distributions/starter/starter.py index 32264eebb..ed38f4d4b 100644 --- a/src/llama_stack/distributions/starter/starter.py +++ b/src/llama_stack/distributions/starter/starter.py @@ -32,6 +32,7 @@ from llama_stack.providers.inline.vector_io.sqlite_vec.config import ( ) from llama_stack.providers.registry.inference import available_providers from llama_stack.providers.remote.vector_io.chroma.config import ChromaVectorIOConfig +from llama_stack.providers.remote.vector_io.elasticsearch.config import ElasticsearchVectorIOConfig from llama_stack.providers.remote.vector_io.pgvector.config import ( PGVectorVectorIOConfig, ) @@ -121,6 +122,7 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: BuildProvider(provider_type="remote::pgvector"), BuildProvider(provider_type="remote::qdrant"), BuildProvider(provider_type="remote::weaviate"), + BuildProvider(provider_type="remote::elasticsearch"), ], "files": [BuildProvider(provider_type="inline::localfs")], "safety": [ @@ -237,6 +239,15 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: cluster_url="${env.WEAVIATE_CLUSTER_URL:=}", ), ), + Provider( + provider_id="${env.ELASTICSEARCH_URL:+elasticsearch}", + provider_type="remote::elasticsearch", + config=ElasticsearchVectorIOConfig.sample_run_config( + f"~/.llama/distributions/{name}", + elasticsearch_url="${env.ELASTICSEARCH_URL:=localhost:9200}", + elasticsearch_api_key="${env.ELASTICSEARCH_API_KEY:=}", + ), + ), ], "files": [files_provider], } diff --git a/src/llama_stack/providers/registry/vector_io.py b/src/llama_stack/providers/registry/vector_io.py index a00941586..df0d18add 100644 --- a/src/llama_stack/providers/registry/vector_io.py +++ b/src/llama_stack/providers/registry/vector_io.py @@ -823,6 +823,55 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi optional_api_dependencies=[Api.files, Api.models], description=""" Please refer to the remote provider documentation. +""", + ), + RemoteProviderSpec( + api=Api.vector_io, + adapter_type="elasticsearch", + provider_type="remote::elasticsearch", + pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + module="llama_stack.providers.remote.vector_io.elasticsearch", + config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", + api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files, Api.models], + description=""" +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. """, ), ] diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py new file mode 100644 index 000000000..33e3930a2 --- /dev/null +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from llama_stack_api import Api, ProviderSpec + +from .config import ElasticsearchVectorIOConfig + + +async def get_adapter_impl(config: ElasticsearchVectorIOConfig, deps: dict[Api, ProviderSpec]): + from .elasticsearch import ElasticsearchVectorIOAdapter + + impl = ElasticsearchVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files)) + await impl.initialize() + return impl diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py new file mode 100644 index 000000000..caa7e7891 --- /dev/null +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -0,0 +1,32 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Any + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference +from llama_stack_api import json_schema_type + + +@json_schema_type +class ElasticsearchVectorIOConfig(BaseModel): + elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + persistence: KVStoreReference | None = Field( + description="Config for KV store backend (SQLite only for now)", default=None + ) + + @classmethod + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: + return { + "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "elasticsearch_api_key": "${env.ELASTICSEARCH_API_KEY:=}", + "persistence": KVStoreReference( + backend="kv_default", + namespace="vector_io::elasticsearch", + ).model_dump(exclude_none=True), + } diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py new file mode 100644 index 000000000..aa3a986dc --- /dev/null +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -0,0 +1,331 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Any + +from elasticsearch import AsyncElasticsearch +from elasticsearch.helpers import async_bulk +from numpy.typing import NDArray + +from llama_stack.core.storage.kvstore import kvstore_impl +from llama_stack.log import get_logger +from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin +from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex +from llama_stack_api import ( + Chunk, + Files, + Inference, + InterleavedContent, + QueryChunksResponse, + VectorIO, + VectorStore, + VectorStoreNotFoundError, + VectorStoresProtocolPrivate, +) + +from .config import ElasticsearchVectorIOConfig + +log = get_logger(name=__name__, category="vector_io::elasticsearch") + +# KV store prefixes for vector databases +VERSION = "v3" +VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::" + + +class ElasticsearchIndex(EmbeddingIndex): + def __init__(self, client: AsyncElasticsearch, collection_name: str): + self.client = client + self.collection_name = collection_name + + async def initialize(self) -> None: + # Elasticsearch collections (indexes) are created on-demand in add_chunks + # If the index does not exist, it will be created in add_chunks. + pass + + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + """Adds chunks and their embeddings to the Elasticsearch index.""" + + assert len(chunks) == len(embeddings), ( + f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" + ) + + if not await self.client.indices.exists(self.collection_name): + await self.client.indices.create( + index=self.collection_name, + body={ + "mappings": { + "properties": { + "vector": {"type": "dense_vector", "dims": len(embeddings[0])}, + "chunk_content": {"type": "object"}, + } + } + }, + ) + + actions = [] + for chunk, embedding in zip(chunks, embeddings, strict=False): + actions.append( + { + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()}, + } + ) + + try: + successful_count, error_count = await async_bulk( + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True + ) + if error_count > 0: + log.warning( + f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}" + ) + + log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error adding chunks to Elasticsearch index {self.collection_name}: {e}") + raise + + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Remove a chunk from the Elasticsearch index.""" + + actions = [] + for chunk in chunks_for_deletion: + actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id}) + + try: + successful_count, error_count = await async_bulk( + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True + ) + if error_count > 0: + log.warning( + f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}" + ) + + log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error deleting chunks from Elasticsearch index {self.collection_name}: {e}") + raise + + async def _results_to_chunks(self, results: dict) -> QueryChunksResponse: + """Convert search results to QueryChunksResponse.""" + + chunks, scores = [], [] + for result in results["hits"]["hits"]: + try: + chunk = Chunk( + content=result["_source"]["chunk_content"], + stored_chunk_id=result["_id"], + embedding=result["_source"]["vector"], + ) + except Exception: + log.exception("Failed to parse chunk") + continue + + chunks.append(chunk) + scores.append(result["_score"]) + + return QueryChunksResponse(chunks=chunks, scores=scores) + + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: + """Vector search using kNN.""" + + try: + results = await self.client.search( + index=self.collection_name, + query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, + min_score=score_threshold, + limit=k, + ) + except Exception as e: + log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse: + """Keyword search using match query.""" + + try: + results = await self.client.search( + index=self.collection_name, + query={"match": {"chunk_content": {"query": query_string}}}, + min_score=score_threshold, + limit=k, + ) + except Exception as e: + log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_hybrid( + self, + embedding: NDArray, + query_string: str, + k: int, + score_threshold: float, + reranker_type: str, + reranker_params: dict[str, Any] | None = None, + ) -> QueryChunksResponse: + supported_retrievers = ["rrf", "linear"] + if reranker_type not in supported_retrievers: + raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") + + retriever = { + reranker_type: { + "retrievers": [ + {"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}}, + {"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, + ] + } + } + + # Add reranker parameters if provided for RRF (e.g. rank_constant) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/rrf-retriever + if reranker_type == "rrf" and reranker_params is not None: + retriever["rrf"].update(reranker_params) + # Add reranker parameters if provided for Linear (e.g. weights) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/linear-retriever + # Since the weights are per retriever, we need to update them separately, using the following syntax + # reranker_params = { + # "retrievers": { + # "standard": {"weight": 0.7}, + # "knn": {"weight": 0.3} + # } + # } + elif reranker_type == "linear" and reranker_params is not None: + retrievers_params = reranker_params.get("retrievers") + if retrievers_params is not None: + for i in range(0, len(retriever["linear"]["retrievers"])): + retr_type = list(retriever["linear"]["retrievers"][i].keys())[0] + retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) + del reranker_params["retrievers"] + retriever["linear"].update(reranker_params) + + try: + results = await self.client.search( + index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold + ) + except Exception as e: + log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def delete(self): + """Delete the entire Elasticsearch index with collection_name.""" + + try: + await self.client.delete(index=self.collection_name) + except Exception as e: + log.error(f"Error deleting Elasticsearch index {self.collection_name}: {e}") + raise + + +class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate): + def __init__( + self, + config: ElasticsearchVectorIOConfig, + inference_api: Inference, + files_api: Files | None = None, + ) -> None: + super().__init__(files_api=files_api, kvstore=None) + self.config = config + self.client: AsyncElasticsearch = None + self.cache = {} + self.inference_api = inference_api + self.vector_store_table = None + + async def initialize(self) -> None: + self.client = AsyncElasticsearch(hosts=self.config.elasticsearch_url, api_key=self.config.elasticsearch_api_key) + self.kvstore = await kvstore_impl(self.config.persistence) + + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key) + + for vector_store_data in stored_vector_stores: + vector_store = VectorStore.model_validate_json(vector_store_data) + index = VectorStoreWithIndex( + vector_store, ElasticsearchIndex(self.client, vector_store.identifier), self.inference_api + ) + self.cache[vector_store.identifier] = index + self.openai_vector_stores = await self._load_openai_vector_stores() + + async def shutdown(self) -> None: + await self.client.close() + # Clean up mixin resources (file batch tasks) + await super().shutdown() + + async def register_vector_store(self, vector_store: VectorStore) -> None: + assert self.kvstore is not None + key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" + await self.kvstore.set(key=key, value=vector_store.model_dump_json()) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(self.client, vector_store.identifier), + inference_api=self.inference_api, + ) + + self.cache[vector_store.identifier] = index + + async def unregister_vector_store(self, vector_store_id: str) -> None: + if vector_store_id in self.cache: + await self.cache[vector_store_id].index.delete() + del self.cache[vector_store_id] + + assert self.kvstore is not None + await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") + + async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: + if vector_store_id in self.cache: + return self.cache[vector_store_id] + + if self.vector_store_table is None: + raise ValueError(f"Vector DB not found {vector_store_id}") + + vector_store = await self.vector_store_table.get_vector_store(vector_store_id) + if not vector_store: + raise VectorStoreNotFoundError(vector_store_id) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(client=self.client, collection_name=vector_store.identifier), + inference_api=self.inference_api, + ) + self.cache[vector_store_id] = index + return index + + async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + await index.insert_chunks(chunks) + + async def query_chunks( + self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None + ) -> QueryChunksResponse: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + return await index.query_chunks(query, params) + + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from an Elasticsearch vector store.""" + index = await self._get_and_cache_vector_store_index(store_id) + if not index: + raise ValueError(f"Vector DB {store_id} not found") + + await index.index.delete_chunks(chunks_for_deletion) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0d0af687f..683ca7fe9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -379,6 +379,7 @@ def vector_provider_wrapper(func): "pgvector", "weaviate", "qdrant", + "elasticsearch", ] else: # For CI tests (replay/record), only use providers that are available in ci-tests environment diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 102f3f00c..b080d95a0 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ]: return @@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ], "keyword": [ "inline::milvus", @@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], "hybrid": [ "inline::milvus", @@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], } supported_providers = search_mode_support.get(search_mode, []) diff --git a/tests/integration/vector_io/test_vector_io.py b/tests/integration/vector_io/test_vector_io.py index 29dbd3e56..50efd4c16 100644 --- a/tests/integration/vector_io/test_vector_io.py +++ b/tests/integration/vector_io/test_vector_io.py @@ -164,6 +164,7 @@ def test_insert_chunks_with_precomputed_embeddings( "inline::milvus": {"score_threshold": -1.0}, "inline::qdrant": {"score_threshold": -1.0}, "remote::qdrant": {"score_threshold": -1.0}, + "remote::elasticsearch": {"score_threshold": -1.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( @@ -214,6 +215,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb( "inline::milvus": {"score_threshold": 0.0}, "remote::qdrant": {"score_threshold": 0.0}, "inline::qdrant": {"score_threshold": 0.0}, + "remote::elasticsearch": {"score_threshold": 0.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( diff --git a/uv.lock b/uv.lock index 8c648c362..1b6df8474 100644 --- a/uv.lock +++ b/uv.lock @@ -988,6 +988,33 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "elastic-transport" +version = "8.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" }, +] + +[[package]] +name = "elasticsearch" +version = "8.19.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "elastic-transport" }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" }, +] + [[package]] name = "eval-type-backport" version = "0.2.2" @@ -2077,6 +2104,7 @@ test = [ { name = "chardet" }, { name = "chromadb" }, { name = "datasets" }, + { name = "elasticsearch" }, { name = "mcp" }, { name = "milvus-lite" }, { name = "psycopg2-binary" }, @@ -2223,6 +2251,7 @@ test = [ { name = "chardet" }, { name = "chromadb", specifier = ">=1.0.15" }, { name = "datasets", specifier = ">=4.0.0" }, + { name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" }, { name = "mcp" }, { name = "milvus-lite", specifier = ">=2.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" },