diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index 1962629c2..8165fabbd 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -146,6 +146,14 @@ jobs: docker logs weaviate exit 1 + - name: Setup Elasticsearch + if: matrix.vector-io-provider == 'remote::elasticsearch' + id: setup-elasticsearch + run: | + curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly + source elastic-start-local/.env + echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT" + - name: Build Llama Stack run: | uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install @@ -170,6 +178,8 @@ jobs: QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }} ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} + ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ @@ -196,6 +206,11 @@ jobs: run: | docker logs qdrant > qdrant.log + - name: Write Elasticsearch logs to file + if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }} + run: | + docker logs es-local-dev > elasticsearch.log + - name: Upload all logs to artifacts if: ${{ always() }} uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 diff --git a/docs/docs/providers/vector_io/remote_elasticsearch.mdx b/docs/docs/providers/vector_io/remote_elasticsearch.mdx new file mode 100644 index 000000000..864015a5c --- /dev/null +++ b/docs/docs/providers/vector_io/remote_elasticsearch.mdx @@ -0,0 +1,104 @@ +--- +description: | + [Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. + It allows you to store and query vectors directly within an Elasticsearch database. + That means you're not limited to storing vectors in memory or in a separate service. + + ## Features + Elasticsearch supports: + - Store embeddings and their metadata + - Vector search + - Full-text search + - Fuzzy search + - Hybrid search + - Document storage + - Metadata filtering + - Inference service + - Machine Learning integrations + + ## Usage + + To use Elasticsearch in your Llama Stack project, follow these steps: + + 1. Install the necessary dependencies. + 2. Configure your Llama Stack project to use Elasticsearch. + 3. Start storing and querying vectors. + + ## Installation + + You can test Elasticsearch locally by running this script in the terminal: + + ```bash + curl -fsSL https://elastic.co/start-local | sh + ``` + + Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. + For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + + ## Documentation + See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +sidebar_label: Remote - Elasticsearch +title: remote::elasticsearch +--- + +# remote::elasticsearch + +## Description + + +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. + + +## Configuration + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | +| `hosts` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | +| `persistence` | `llama_stack.core.storage.datatypes.KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | + +## Sample Configuration + +```yaml +hosts: ${env.ELASTICSEARCH_API_KEY:=None} +api_key: ${env.ELASTICSEARCH_URL:=localhost:9200} +persistence: + namespace: vector_io::elasticsearch + backend: kv_default +``` diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/llama_stack/providers/remote/vector_io/elasticsearch/config.py index f8c5ed028..4f2237d79 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/config.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class ElasticsearchVectorIOConfig(BaseModel): - elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) - elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") persistence: KVStoreReference | None = Field( description="Config for KV store backend (SQLite only for now)", default=None ) @@ -23,10 +23,10 @@ class ElasticsearchVectorIOConfig(BaseModel): @classmethod def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { - "elasticsearch_api_key": None, - "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "hosts": "${env.ELASTICSEARCH_API_KEY:=None}", + "api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}", "persistence": KVStoreReference( backend="kv_default", namespace="vector_io::elasticsearch", ).model_dump(exclude_none=True), - } \ No newline at end of file + } diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index b2a45265b..2bbb167a0 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -4,12 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio from typing import Any -from numpy.typing import NDArray from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk +from numpy.typing import NDArray from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.files import Files @@ -18,13 +17,10 @@ from llama_stack.apis.vector_io import ( Chunk, QueryChunksResponse, VectorIO, - VectorStoreChunkingStrategy, - VectorStoreFileObject, ) from llama_stack.apis.vector_stores import VectorStore from llama_stack.log import get_logger from llama_stack.providers.datatypes import VectorStoresProtocolPrivate -from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex @@ -36,6 +32,10 @@ log = get_logger(name=__name__, category="vector_io::elasticsearch") # KV store prefixes for vector databases VERSION = "v3" VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::" class ElasticsearchIndex(EmbeddingIndex): @@ -61,39 +61,32 @@ class ElasticsearchIndex(EmbeddingIndex): body={ "mappings": { "properties": { - "vector": { - "type": "dense_vector", - "dims": len(embeddings[0]) - }, + "vector": {"type": "dense_vector", "dims": len(embeddings[0])}, "chunk_content": {"type": "object"}, } } - } + }, ) actions = [] for chunk, embedding in zip(chunks, embeddings, strict=False): - actions.append({ - "_op_type": "index", - "_index": self.collection_name, - "_id": chunk.chunk_id, - "_source": { - "vector": embedding, - "chunk_content": chunk.model_dump_json() - } - }) + actions.append( + { + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()}, + } + ) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=False, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") except Exception as e: @@ -105,23 +98,16 @@ class ElasticsearchIndex(EmbeddingIndex): actions = [] for chunk in chunks_for_deletion: - actions.append({ - "_op_type": "delete", - "_index": self.collection_name, - "_id": chunk.chunk_id - }) + actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id}) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=True, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") except Exception as e: @@ -132,12 +118,12 @@ class ElasticsearchIndex(EmbeddingIndex): """Convert search results to QueryChunksResponse.""" chunks, scores = [], [] - for result in results['hits']['hits']: + for result in results["hits"]["hits"]: try: chunk = Chunk( content=result["_source"]["chunk_content"], - chunk_id=result["_id"], - embedding=result["_source"]["vector"] + stored_chunk_id=result["_id"], + embedding=result["_source"]["vector"], ) except Exception: log.exception("Failed to parse chunk") @@ -147,24 +133,16 @@ class ElasticsearchIndex(EmbeddingIndex): scores.append(result["_score"]) return QueryChunksResponse(chunks=chunks, scores=scores) - + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: """Vector search using kNN.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") @@ -176,19 +154,11 @@ class ElasticsearchIndex(EmbeddingIndex): """Keyword search using match query.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "match": { - "chunk_content": { - "query": query_string - } - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"match": {"chunk_content": {"query": query_string}}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") @@ -205,32 +175,15 @@ class ElasticsearchIndex(EmbeddingIndex): reranker_type: str, reranker_params: dict[str, Any] | None = None, ) -> QueryChunksResponse: - supported_retrievers = ["rrf", "linear"] if reranker_type not in supported_retrievers: raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") - + retriever = { reranker_type: { "retrievers": [ - { - "retriever": { - "standard": { - "query": { - "match": { - "chunk_content": query_string - } - } - } - } - }, - { - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - } + {"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}}, + {"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, ] } } @@ -252,17 +205,14 @@ class ElasticsearchIndex(EmbeddingIndex): retrievers_params = reranker_params.get("retrievers") if retrievers_params is not None: for i in range(0, len(retriever["linear"]["retrievers"])): - retr_type=retriever["linear"]["retrievers"][i]["retriever"].key() + retr_type = list(retriever["linear"]["retrievers"][i].keys())[0] retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) del reranker_params["retrievers"] retriever["linear"].update(reranker_params) try: results = await self.client.search( - index=self.collection_name, - size=k, - retriever=retriever, - min_score=score_threshold + index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold ) except Exception as e: log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") diff --git a/src/llama_stack/providers/registry/vector_io.py b/src/llama_stack/providers/registry/vector_io.py index a561c1a5d..66cc4ed6a 100644 --- a/src/llama_stack/providers/registry/vector_io.py +++ b/src/llama_stack/providers/registry/vector_io.py @@ -825,11 +825,11 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi Please refer to the remote provider documentation. """, ), - InlineProviderSpec( + RemoteProviderSpec( api=Api.vector_io, adapter_type="elasticsearch", provider_type="remote::elasticsearch", - pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS, module="llama_stack.providers.remote.vector_io.elasticsearch", config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", api_dependencies=[Api.inference], @@ -871,6 +871,7 @@ Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overvie For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). ## Documentation -See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""", - ) +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +""", + ), ] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0d0af687f..0bb38481f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -371,18 +371,7 @@ def vector_provider_wrapper(func): inference_mode = os.environ.get("LLAMA_STACK_TEST_INFERENCE_MODE") if inference_mode == "live": # For live tests, try all providers (they'll skip if not available) - all_providers = [ - "faiss", - "sqlite-vec", - "milvus", - "chromadb", - "pgvector", - "weaviate", - "qdrant", - ] - else: - # For CI tests (replay/record), only use providers that are available in ci-tests environment - all_providers = ["faiss", "sqlite-vec"] + all_providers = ["faiss", "sqlite-vec", "milvus", "chromadb", "pgvector", "weaviate", "qdrant", "elasticsearch"] return pytest.mark.parametrize("vector_io_provider_id", all_providers)(wrapper) diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 97ce4abe8..7aef24223 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ]: return @@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ], "keyword": [ "inline::milvus", @@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], "hybrid": [ "inline::milvus", @@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], } supported_providers = search_mode_support.get(search_mode, []) diff --git a/tests/integration/vector_io/test_vector_io.py b/tests/integration/vector_io/test_vector_io.py index 1b2099069..d48d47dcc 100644 --- a/tests/integration/vector_io/test_vector_io.py +++ b/tests/integration/vector_io/test_vector_io.py @@ -164,6 +164,7 @@ def test_insert_chunks_with_precomputed_embeddings( "inline::milvus": {"score_threshold": -1.0}, "inline::qdrant": {"score_threshold": -1.0}, "remote::qdrant": {"score_threshold": -1.0}, + "remote::elasticsearch": {"score_threshold": -1.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( @@ -214,6 +215,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb( "inline::milvus": {"score_threshold": 0.0}, "remote::qdrant": {"score_threshold": 0.0}, "inline::qdrant": {"score_threshold": 0.0}, + "remote::elasticsearch": {"score_threshold": 0.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( diff --git a/uv.lock b/uv.lock index de1c8879c..3201d1942 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "(python_full_version >= '3.13' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.13' and sys_platform != 'darwin' and sys_platform != 'linux')", @@ -979,6 +979,33 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "elastic-transport" +version = "8.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" }, +] + +[[package]] +name = "elasticsearch" +version = "8.19.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "elastic-transport" }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" }, +] + [[package]] name = "eval-type-backport" version = "0.2.2" @@ -2021,6 +2048,7 @@ test = [ { name = "chardet" }, { name = "chromadb" }, { name = "datasets" }, + { name = "elasticsearch" }, { name = "mcp" }, { name = "milvus-lite" }, { name = "psycopg2-binary" }, @@ -2167,6 +2195,7 @@ test = [ { name = "chardet" }, { name = "chromadb", specifier = ">=1.0.15" }, { name = "datasets", specifier = ">=4.0.0" }, + { name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" }, { name = "mcp" }, { name = "milvus-lite", specifier = ">=2.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" },