From 6f39c8994d8ca368caee9c2ae79656b6b6686255 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Wed, 29 Oct 2025 17:18:16 +0100 Subject: [PATCH 01/13] Draft for Elasticsearch as vector db --- llama_stack/providers/registry/vector_io.py | 48 +++ .../vector_io/elasticsearch/__init__.py | 17 + .../remote/vector_io/elasticsearch/config.py | 32 ++ .../vector_io/elasticsearch/elasticsearch.py | 380 ++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 478 insertions(+) create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/__init__.py create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/config.py create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py diff --git a/llama_stack/providers/registry/vector_io.py b/llama_stack/providers/registry/vector_io.py index ff3b8486f..d77fb98e5 100644 --- a/llama_stack/providers/registry/vector_io.py +++ b/llama_stack/providers/registry/vector_io.py @@ -825,4 +825,52 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi Please refer to the remote provider documentation. """, ), + InlineProviderSpec( + api=Api.vector_io, + adapter_type="elasticsearch", + provider_type="remote::elasticsearch", + pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + module="llama_stack.providers.remote.vector_io.elasticsearch", + config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", + api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files, Api.models], + description=""" +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""", + ) ] diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py b/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py new file mode 100644 index 000000000..6370c6196 --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from llama_stack.providers.datatypes import Api, ProviderSpec + +from .config import ElasticsearchVectorIOConfig + + +async def get_adapter_impl(config: ElasticsearchVectorIOConfig, deps: dict[Api, ProviderSpec]): + from .elasticsearch import ElasticsearchVectorIOAdapter + + impl = ElasticsearchVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files)) + await impl.initialize() + return impl diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/llama_stack/providers/remote/vector_io/elasticsearch/config.py new file mode 100644 index 000000000..f8c5ed028 --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -0,0 +1,32 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Any + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference +from llama_stack.schema_utils import json_schema_type + + +@json_schema_type +class ElasticsearchVectorIOConfig(BaseModel): + elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + persistence: KVStoreReference | None = Field( + description="Config for KV store backend (SQLite only for now)", default=None + ) + + @classmethod + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: + return { + "elasticsearch_api_key": None, + "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "persistence": KVStoreReference( + backend="kv_default", + namespace="vector_io::elasticsearch", + ).model_dump(exclude_none=True), + } \ No newline at end of file diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py new file mode 100644 index 000000000..b2a45265b --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -0,0 +1,380 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +from typing import Any + +from numpy.typing import NDArray +from elasticsearch import AsyncElasticsearch +from elasticsearch.helpers import async_bulk + +from llama_stack.apis.common.errors import VectorStoreNotFoundError +from llama_stack.apis.files import Files +from llama_stack.apis.inference import Inference, InterleavedContent +from llama_stack.apis.vector_io import ( + Chunk, + QueryChunksResponse, + VectorIO, + VectorStoreChunkingStrategy, + VectorStoreFileObject, +) +from llama_stack.apis.vector_stores import VectorStore +from llama_stack.log import get_logger +from llama_stack.providers.datatypes import VectorStoresProtocolPrivate +from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin +from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex + +from .config import ElasticsearchVectorIOConfig + +log = get_logger(name=__name__, category="vector_io::elasticsearch") + +# KV store prefixes for vector databases +VERSION = "v3" +VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" + + +class ElasticsearchIndex(EmbeddingIndex): + def __init__(self, client: AsyncElasticsearch, collection_name: str): + self.client = client + self.collection_name = collection_name + + async def initialize(self) -> None: + # Elasticsearch collections (indexes) are created on-demand in add_chunks + # If the index does not exist, it will be created in add_chunks. + pass + + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + """Adds chunks and their embeddings to the Elasticsearch index.""" + + assert len(chunks) == len(embeddings), ( + f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" + ) + + if not await self.client.indices.exists(self.collection_name): + await self.client.indices.create( + index=self.collection_name, + body={ + "mappings": { + "properties": { + "vector": { + "type": "dense_vector", + "dims": len(embeddings[0]) + }, + "chunk_content": {"type": "object"}, + } + } + } + ) + + actions = [] + for chunk, embedding in zip(chunks, embeddings, strict=False): + actions.append({ + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": { + "vector": embedding, + "chunk_content": chunk.model_dump_json() + } + }) + + try: + successful_count, error_count = await async_bulk( + client=self.client, + actions=actions, + timeout='300s', + refresh=True, + raise_on_error=False, + stats_only=True + ) + if error_count > 0: + log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}") + + log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error adding chunks to Elasticsearch index {self.collection_name}: {e}") + raise + + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Remove a chunk from the Elasticsearch index.""" + + actions = [] + for chunk in chunks_for_deletion: + actions.append({ + "_op_type": "delete", + "_index": self.collection_name, + "_id": chunk.chunk_id + }) + + try: + successful_count, error_count = await async_bulk( + client=self.client, + actions=actions, + timeout='300s', + refresh=True, + raise_on_error=True, + stats_only=True + ) + if error_count > 0: + log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}") + + log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error deleting chunks from Elasticsearch index {self.collection_name}: {e}") + raise + + async def _results_to_chunks(self, results: dict) -> QueryChunksResponse: + """Convert search results to QueryChunksResponse.""" + + chunks, scores = [], [] + for result in results['hits']['hits']: + try: + chunk = Chunk( + content=result["_source"]["chunk_content"], + chunk_id=result["_id"], + embedding=result["_source"]["vector"] + ) + except Exception: + log.exception("Failed to parse chunk") + continue + + chunks.append(chunk) + scores.append(result["_score"]) + + return QueryChunksResponse(chunks=chunks, scores=scores) + + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: + """Vector search using kNN.""" + + try: + results = ( + await self.client.search( + index=self.collection_name, + query={ + "knn": { + "field": "vector", + "query_vector": embedding.tolist(), + "k": k + } + }, + min_score=score_threshold, + limit=k + ) + ) + except Exception as e: + log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse: + """Keyword search using match query.""" + + try: + results = ( + await self.client.search( + index=self.collection_name, + query={ + "match": { + "chunk_content": { + "query": query_string + } + } + }, + min_score=score_threshold, + limit=k + ) + ) + except Exception as e: + log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_hybrid( + self, + embedding: NDArray, + query_string: str, + k: int, + score_threshold: float, + reranker_type: str, + reranker_params: dict[str, Any] | None = None, + ) -> QueryChunksResponse: + + supported_retrievers = ["rrf", "linear"] + if reranker_type not in supported_retrievers: + raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") + + retriever = { + reranker_type: { + "retrievers": [ + { + "retriever": { + "standard": { + "query": { + "match": { + "chunk_content": query_string + } + } + } + } + }, + { + "knn": { + "field": "vector", + "query_vector": embedding.tolist(), + "k": k + } + } + ] + } + } + + # Add reranker parameters if provided for RRF (e.g. rank_constant) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/rrf-retriever + if reranker_type == "rrf" and reranker_params is not None: + retriever["rrf"].update(reranker_params) + # Add reranker parameters if provided for Linear (e.g. weights) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/linear-retriever + # Since the weights are per retriever, we need to update them separately, using the following syntax + # reranker_params = { + # "retrievers": { + # "standard": {"weight": 0.7}, + # "knn": {"weight": 0.3} + # } + # } + elif reranker_type == "linear" and reranker_params is not None: + retrievers_params = reranker_params.get("retrievers") + if retrievers_params is not None: + for i in range(0, len(retriever["linear"]["retrievers"])): + retr_type=retriever["linear"]["retrievers"][i]["retriever"].key() + retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) + del reranker_params["retrievers"] + retriever["linear"].update(reranker_params) + + try: + results = await self.client.search( + index=self.collection_name, + size=k, + retriever=retriever, + min_score=score_threshold + ) + except Exception as e: + log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def delete(self): + """Delete the entire Elasticsearch index with collection_name.""" + + try: + await self.client.delete(index=self.collection_name) + except Exception as e: + log.error(f"Error deleting Elasticsearch index {self.collection_name}: {e}") + raise + +class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate): + def __init__( + self, + config: ElasticsearchVectorIOConfig, + inference_api: Inference, + files_api: Files | None = None, + ) -> None: + super().__init__(files_api=files_api, kvstore=None) + self.config = config + self.client: AsyncElasticsearch = None + self.cache = {} + self.inference_api = inference_api + self.vector_store_table = None + + async def initialize(self) -> None: + client_config = self.config.model_dump(exclude_none=True) + self.client = AsyncElasticsearch(**client_config) + self.kvstore = await kvstore_impl(self.config.persistence) + + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key) + + for vector_store_data in stored_vector_stores: + vector_store = VectorStore.model_validate_json(vector_store_data) + index = VectorStoreWithIndex( + vector_store, ElasticsearchIndex(self.client, vector_store.identifier), self.inference_api + ) + self.cache[vector_store.identifier] = index + self.openai_vector_stores = await self._load_openai_vector_stores() + + async def shutdown(self) -> None: + await self.client.close() + # Clean up mixin resources (file batch tasks) + await super().shutdown() + + async def register_vector_store(self, vector_store: VectorStore) -> None: + assert self.kvstore is not None + key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" + await self.kvstore.set(key=key, value=vector_store.model_dump_json()) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(self.client, vector_store.identifier), + inference_api=self.inference_api, + ) + + self.cache[vector_store.identifier] = index + + async def unregister_vector_store(self, vector_store_id: str) -> None: + if vector_store_id in self.cache: + await self.cache[vector_store_id].index.delete() + del self.cache[vector_store_id] + + assert self.kvstore is not None + await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") + + async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: + if vector_store_id in self.cache: + return self.cache[vector_store_id] + + if self.vector_store_table is None: + raise ValueError(f"Vector DB not found {vector_store_id}") + + vector_store = await self.vector_store_table.get_vector_store(vector_store_id) + if not vector_store: + raise VectorStoreNotFoundError(vector_store_id) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(client=self.client, collection_name=vector_store.identifier), + inference_api=self.inference_api, + ) + self.cache[vector_store_id] = index + return index + + async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + await index.insert_chunks(chunks) + + async def query_chunks( + self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None + ) -> QueryChunksResponse: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + return await index.query_chunks(query, params) + + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from an Elasticsearch vector store.""" + index = await self._get_and_cache_vector_store_index(store_id) + if not index: + raise ValueError(f"Vector DB {store_id} not found") + + await index.index.delete_chunks(chunks_for_deletion) diff --git a/pyproject.toml b/pyproject.toml index 741dd17e5..9037e2c42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -119,6 +119,7 @@ test = [ "pymilvus>=2.6.1", "milvus-lite>=2.5.0", "weaviate-client>=4.16.4", + "elasticsearch>=8.16.0, <9.0.0" ] docs = [ "setuptools", From 22b27e6275350e623892f3ad10bbc3cbba70376c Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Fri, 31 Oct 2025 18:18:27 +0100 Subject: [PATCH 02/13] Added tests + docs + CI for Elasticsearch --- .../workflows/integration-vector-io-tests.yml | 15 ++ .../vector_io/remote_elasticsearch.mdx | 104 ++++++++++++++ llama_stack/providers/registry/vector_io.py | 9 +- .../remote/vector_io/elasticsearch/config.py | 10 +- .../vector_io/elasticsearch/elasticsearch.py | 136 ++++++------------ tests/integration/conftest.py | 10 +- .../vector_io/test_openai_vector_stores.py | 4 + tests/integration/vector_io/test_vector_io.py | 2 + uv.lock | 37 ++++- 9 files changed, 215 insertions(+), 112 deletions(-) create mode 100644 docs/docs/providers/vector_io/remote_elasticsearch.mdx diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index a6a86b15f..aff5b202c 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -142,6 +142,14 @@ jobs: docker logs weaviate exit 1 + - name: Setup Elasticsearch + if: matrix.vector-io-provider == 'remote::elasticsearch' + id: setup-elasticsearch + run: | + curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly + source elastic-start-local/.env + echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT" + - name: Build Llama Stack run: | uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install @@ -166,6 +174,8 @@ jobs: QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }} ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} + ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ @@ -192,6 +202,11 @@ jobs: run: | docker logs qdrant > qdrant.log + - name: Write Elasticsearch logs to file + if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }} + run: | + docker logs es-local-dev > elasticsearch.log + - name: Upload all logs to artifacts if: ${{ always() }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 diff --git a/docs/docs/providers/vector_io/remote_elasticsearch.mdx b/docs/docs/providers/vector_io/remote_elasticsearch.mdx new file mode 100644 index 000000000..864015a5c --- /dev/null +++ b/docs/docs/providers/vector_io/remote_elasticsearch.mdx @@ -0,0 +1,104 @@ +--- +description: | + [Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. + It allows you to store and query vectors directly within an Elasticsearch database. + That means you're not limited to storing vectors in memory or in a separate service. + + ## Features + Elasticsearch supports: + - Store embeddings and their metadata + - Vector search + - Full-text search + - Fuzzy search + - Hybrid search + - Document storage + - Metadata filtering + - Inference service + - Machine Learning integrations + + ## Usage + + To use Elasticsearch in your Llama Stack project, follow these steps: + + 1. Install the necessary dependencies. + 2. Configure your Llama Stack project to use Elasticsearch. + 3. Start storing and querying vectors. + + ## Installation + + You can test Elasticsearch locally by running this script in the terminal: + + ```bash + curl -fsSL https://elastic.co/start-local | sh + ``` + + Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. + For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + + ## Documentation + See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +sidebar_label: Remote - Elasticsearch +title: remote::elasticsearch +--- + +# remote::elasticsearch + +## Description + + +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. + + +## Configuration + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | +| `hosts` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | +| `persistence` | `llama_stack.core.storage.datatypes.KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | + +## Sample Configuration + +```yaml +hosts: ${env.ELASTICSEARCH_API_KEY:=None} +api_key: ${env.ELASTICSEARCH_URL:=localhost:9200} +persistence: + namespace: vector_io::elasticsearch + backend: kv_default +``` diff --git a/llama_stack/providers/registry/vector_io.py b/llama_stack/providers/registry/vector_io.py index d77fb98e5..25ec1066f 100644 --- a/llama_stack/providers/registry/vector_io.py +++ b/llama_stack/providers/registry/vector_io.py @@ -825,11 +825,11 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi Please refer to the remote provider documentation. """, ), - InlineProviderSpec( + RemoteProviderSpec( api=Api.vector_io, adapter_type="elasticsearch", provider_type="remote::elasticsearch", - pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS, module="llama_stack.providers.remote.vector_io.elasticsearch", config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", api_dependencies=[Api.inference], @@ -871,6 +871,7 @@ Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overvie For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). ## Documentation -See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""", - ) +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +""", + ), ] diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/llama_stack/providers/remote/vector_io/elasticsearch/config.py index f8c5ed028..4f2237d79 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/config.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class ElasticsearchVectorIOConfig(BaseModel): - elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) - elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") persistence: KVStoreReference | None = Field( description="Config for KV store backend (SQLite only for now)", default=None ) @@ -23,10 +23,10 @@ class ElasticsearchVectorIOConfig(BaseModel): @classmethod def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { - "elasticsearch_api_key": None, - "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "hosts": "${env.ELASTICSEARCH_API_KEY:=None}", + "api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}", "persistence": KVStoreReference( backend="kv_default", namespace="vector_io::elasticsearch", ).model_dump(exclude_none=True), - } \ No newline at end of file + } diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index b2a45265b..2bbb167a0 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -4,12 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio from typing import Any -from numpy.typing import NDArray from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk +from numpy.typing import NDArray from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.files import Files @@ -18,13 +17,10 @@ from llama_stack.apis.vector_io import ( Chunk, QueryChunksResponse, VectorIO, - VectorStoreChunkingStrategy, - VectorStoreFileObject, ) from llama_stack.apis.vector_stores import VectorStore from llama_stack.log import get_logger from llama_stack.providers.datatypes import VectorStoresProtocolPrivate -from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex @@ -36,6 +32,10 @@ log = get_logger(name=__name__, category="vector_io::elasticsearch") # KV store prefixes for vector databases VERSION = "v3" VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::" class ElasticsearchIndex(EmbeddingIndex): @@ -61,39 +61,32 @@ class ElasticsearchIndex(EmbeddingIndex): body={ "mappings": { "properties": { - "vector": { - "type": "dense_vector", - "dims": len(embeddings[0]) - }, + "vector": {"type": "dense_vector", "dims": len(embeddings[0])}, "chunk_content": {"type": "object"}, } } - } + }, ) actions = [] for chunk, embedding in zip(chunks, embeddings, strict=False): - actions.append({ - "_op_type": "index", - "_index": self.collection_name, - "_id": chunk.chunk_id, - "_source": { - "vector": embedding, - "chunk_content": chunk.model_dump_json() - } - }) + actions.append( + { + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()}, + } + ) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=False, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") except Exception as e: @@ -105,23 +98,16 @@ class ElasticsearchIndex(EmbeddingIndex): actions = [] for chunk in chunks_for_deletion: - actions.append({ - "_op_type": "delete", - "_index": self.collection_name, - "_id": chunk.chunk_id - }) + actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id}) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=True, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") except Exception as e: @@ -132,12 +118,12 @@ class ElasticsearchIndex(EmbeddingIndex): """Convert search results to QueryChunksResponse.""" chunks, scores = [], [] - for result in results['hits']['hits']: + for result in results["hits"]["hits"]: try: chunk = Chunk( content=result["_source"]["chunk_content"], - chunk_id=result["_id"], - embedding=result["_source"]["vector"] + stored_chunk_id=result["_id"], + embedding=result["_source"]["vector"], ) except Exception: log.exception("Failed to parse chunk") @@ -147,24 +133,16 @@ class ElasticsearchIndex(EmbeddingIndex): scores.append(result["_score"]) return QueryChunksResponse(chunks=chunks, scores=scores) - + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: """Vector search using kNN.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") @@ -176,19 +154,11 @@ class ElasticsearchIndex(EmbeddingIndex): """Keyword search using match query.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "match": { - "chunk_content": { - "query": query_string - } - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"match": {"chunk_content": {"query": query_string}}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") @@ -205,32 +175,15 @@ class ElasticsearchIndex(EmbeddingIndex): reranker_type: str, reranker_params: dict[str, Any] | None = None, ) -> QueryChunksResponse: - supported_retrievers = ["rrf", "linear"] if reranker_type not in supported_retrievers: raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") - + retriever = { reranker_type: { "retrievers": [ - { - "retriever": { - "standard": { - "query": { - "match": { - "chunk_content": query_string - } - } - } - } - }, - { - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - } + {"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}}, + {"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, ] } } @@ -252,17 +205,14 @@ class ElasticsearchIndex(EmbeddingIndex): retrievers_params = reranker_params.get("retrievers") if retrievers_params is not None: for i in range(0, len(retriever["linear"]["retrievers"])): - retr_type=retriever["linear"]["retrievers"][i]["retriever"].key() + retr_type = list(retriever["linear"]["retrievers"][i].keys())[0] retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) del reranker_params["retrievers"] retriever["linear"].update(reranker_params) try: results = await self.client.search( - index=self.collection_name, - size=k, - retriever=retriever, - min_score=score_threshold + index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold ) except Exception as e: log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d86fafed2..18d83bd71 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -368,15 +368,7 @@ def vector_provider_wrapper(func): all_providers = ["faiss", "sqlite-vec"] else: # For live tests, try all providers (they'll skip if not available) - all_providers = [ - "faiss", - "sqlite-vec", - "milvus", - "chromadb", - "pgvector", - "weaviate", - "qdrant", - ] + all_providers = ["faiss", "sqlite-vec", "milvus", "chromadb", "pgvector", "weaviate", "qdrant", "elasticsearch"] return pytest.mark.parametrize("vector_io_provider_id", all_providers)(wrapper) diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 626faf42d..0e93f60a5 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ]: return @@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ], "keyword": [ "inline::milvus", @@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], "hybrid": [ "inline::milvus", @@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], } supported_providers = search_mode_support.get(search_mode, []) diff --git a/tests/integration/vector_io/test_vector_io.py b/tests/integration/vector_io/test_vector_io.py index 1f67ddb24..1eeb086c4 100644 --- a/tests/integration/vector_io/test_vector_io.py +++ b/tests/integration/vector_io/test_vector_io.py @@ -154,6 +154,7 @@ def test_insert_chunks_with_precomputed_embeddings( "inline::milvus": {"score_threshold": -1.0}, "inline::qdrant": {"score_threshold": -1.0}, "remote::qdrant": {"score_threshold": -1.0}, + "remote::elasticsearch": {"score_threshold": -1.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( @@ -203,6 +204,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb( "inline::milvus": {"score_threshold": 0.0}, "remote::qdrant": {"score_threshold": 0.0}, "inline::qdrant": {"score_threshold": 0.0}, + "remote::elasticsearch": {"score_threshold": 0.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( diff --git a/uv.lock b/uv.lock index aad77f6a1..70e173db3 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "(python_full_version >= '3.13' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.13' and sys_platform != 'darwin' and sys_platform != 'linux')", @@ -874,6 +874,33 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "elastic-transport" +version = "8.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" }, +] + +[[package]] +name = "elasticsearch" +version = "8.19.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "elastic-transport" }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" }, +] + [[package]] name = "eval-type-backport" version = "0.2.2" @@ -1845,6 +1872,7 @@ test = [ { name = "chardet" }, { name = "chromadb" }, { name = "datasets" }, + { name = "elasticsearch" }, { name = "mcp" }, { name = "milvus-lite" }, { name = "psycopg2-binary" }, @@ -1961,6 +1989,7 @@ test = [ { name = "chardet" }, { name = "chromadb", specifier = ">=1.0.15" }, { name = "datasets", specifier = ">=4.0.0" }, + { name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" }, { name = "mcp" }, { name = "milvus-lite", specifier = ">=2.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" }, @@ -3190,8 +3219,10 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2d/75/364847b879eb630b3ac8293798e380e441a957c53657995053c5ec39a316/psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ab8905b5dcb05bf3fb22e0cf90e10f469563486ffb6a96569e51f897c750a76a", size = 4411159, upload-time = "2025-10-10T11:12:00.49Z" }, { url = "https://files.pythonhosted.org/packages/6f/a0/567f7ea38b6e1c62aafd58375665a547c00c608a471620c0edc364733e13/psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:bf940cd7e7fec19181fdbc29d76911741153d51cab52e5c21165f3262125685e", size = 4468234, upload-time = "2025-10-10T11:12:04.892Z" }, { url = "https://files.pythonhosted.org/packages/30/da/4e42788fb811bbbfd7b7f045570c062f49e350e1d1f3df056c3fb5763353/psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:fa0f693d3c68ae925966f0b14b8edda71696608039f4ed61b1fe9ffa468d16db", size = 4166236, upload-time = "2025-10-10T11:12:11.674Z" }, + { url = "https://files.pythonhosted.org/packages/3c/94/c1777c355bc560992af848d98216148be5f1be001af06e06fc49cbded578/psycopg2_binary-2.9.11-cp312-cp312-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:a1cf393f1cdaf6a9b57c0a719a1068ba1069f022a59b8b1fe44b006745b59757", size = 3983083, upload-time = "2025-10-30T02:55:15.73Z" }, { url = "https://files.pythonhosted.org/packages/bd/42/c9a21edf0e3daa7825ed04a4a8588686c6c14904344344a039556d78aa58/psycopg2_binary-2.9.11-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ef7a6beb4beaa62f88592ccc65df20328029d721db309cb3250b0aae0fa146c3", size = 3652281, upload-time = "2025-10-10T11:12:17.713Z" }, { url = "https://files.pythonhosted.org/packages/12/22/dedfbcfa97917982301496b6b5e5e6c5531d1f35dd2b488b08d1ebc52482/psycopg2_binary-2.9.11-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:31b32c457a6025e74d233957cc9736742ac5a6cb196c6b68499f6bb51390bd6a", size = 3298010, upload-time = "2025-10-10T11:12:22.671Z" }, + { url = "https://files.pythonhosted.org/packages/66/ea/d3390e6696276078bd01b2ece417deac954dfdd552d2edc3d03204416c0c/psycopg2_binary-2.9.11-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:edcb3aeb11cb4bf13a2af3c53a15b3d612edeb6409047ea0b5d6a21a9d744b34", size = 3044641, upload-time = "2025-10-30T02:55:19.929Z" }, { url = "https://files.pythonhosted.org/packages/12/9a/0402ded6cbd321da0c0ba7d34dc12b29b14f5764c2fc10750daa38e825fc/psycopg2_binary-2.9.11-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:62b6d93d7c0b61a1dd6197d208ab613eb7dcfdcca0a49c42ceb082257991de9d", size = 3347940, upload-time = "2025-10-10T11:12:26.529Z" }, { url = "https://files.pythonhosted.org/packages/b1/d2/99b55e85832ccde77b211738ff3925a5d73ad183c0b37bcbbe5a8ff04978/psycopg2_binary-2.9.11-cp312-cp312-win_amd64.whl", hash = "sha256:b33fabeb1fde21180479b2d4667e994de7bbf0eec22832ba5d9b5e4cf65b6c6d", size = 2714147, upload-time = "2025-10-10T11:12:29.535Z" }, { url = "https://files.pythonhosted.org/packages/ff/a8/a2709681b3ac11b0b1786def10006b8995125ba268c9a54bea6f5ae8bd3e/psycopg2_binary-2.9.11-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:b8fb3db325435d34235b044b199e56cdf9ff41223a4b9752e8576465170bb38c", size = 3756572, upload-time = "2025-10-10T11:12:32.873Z" }, @@ -3199,8 +3230,10 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/11/32/b2ffe8f3853c181e88f0a157c5fb4e383102238d73c52ac6d93a5c8bffe6/psycopg2_binary-2.9.11-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8c55b385daa2f92cb64b12ec4536c66954ac53654c7f15a203578da4e78105c0", size = 4411242, upload-time = "2025-10-10T11:12:42.388Z" }, { url = "https://files.pythonhosted.org/packages/10/04/6ca7477e6160ae258dc96f67c371157776564679aefd247b66f4661501a2/psycopg2_binary-2.9.11-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:c0377174bf1dd416993d16edc15357f6eb17ac998244cca19bc67cdc0e2e5766", size = 4468258, upload-time = "2025-10-10T11:12:48.654Z" }, { url = "https://files.pythonhosted.org/packages/3c/7e/6a1a38f86412df101435809f225d57c1a021307dd0689f7a5e7fe83588b1/psycopg2_binary-2.9.11-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:5c6ff3335ce08c75afaed19e08699e8aacf95d4a260b495a4a8545244fe2ceb3", size = 4166295, upload-time = "2025-10-10T11:12:52.525Z" }, + { url = "https://files.pythonhosted.org/packages/f2/7d/c07374c501b45f3579a9eb761cbf2604ddef3d96ad48679112c2c5aa9c25/psycopg2_binary-2.9.11-cp313-cp313-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:84011ba3109e06ac412f95399b704d3d6950e386b7994475b231cf61eec2fc1f", size = 3983133, upload-time = "2025-10-30T02:55:24.329Z" }, { url = "https://files.pythonhosted.org/packages/82/56/993b7104cb8345ad7d4516538ccf8f0d0ac640b1ebd8c754a7b024e76878/psycopg2_binary-2.9.11-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ba34475ceb08cccbdd98f6b46916917ae6eeb92b5ae111df10b544c3a4621dc4", size = 3652383, upload-time = "2025-10-10T11:12:56.387Z" }, { url = "https://files.pythonhosted.org/packages/2d/ac/eaeb6029362fd8d454a27374d84c6866c82c33bfc24587b4face5a8e43ef/psycopg2_binary-2.9.11-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:b31e90fdd0f968c2de3b26ab014314fe814225b6c324f770952f7d38abf17e3c", size = 3298168, upload-time = "2025-10-10T11:13:00.403Z" }, + { url = "https://files.pythonhosted.org/packages/2b/39/50c3facc66bded9ada5cbc0de867499a703dc6bca6be03070b4e3b65da6c/psycopg2_binary-2.9.11-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:d526864e0f67f74937a8fce859bd56c979f5e2ec57ca7c627f5f1071ef7fee60", size = 3044712, upload-time = "2025-10-30T02:55:27.975Z" }, { url = "https://files.pythonhosted.org/packages/9c/8e/b7de019a1f562f72ada81081a12823d3c1590bedc48d7d2559410a2763fe/psycopg2_binary-2.9.11-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:04195548662fa544626c8ea0f06561eb6203f1984ba5b4562764fbeb4c3d14b1", size = 3347549, upload-time = "2025-10-10T11:13:03.971Z" }, { url = "https://files.pythonhosted.org/packages/80/2d/1bb683f64737bbb1f86c82b7359db1eb2be4e2c0c13b947f80efefa7d3e5/psycopg2_binary-2.9.11-cp313-cp313-win_amd64.whl", hash = "sha256:efff12b432179443f54e230fdf60de1f6cc726b6c832db8701227d089310e8aa", size = 2714215, upload-time = "2025-10-10T11:13:07.14Z" }, { url = "https://files.pythonhosted.org/packages/64/12/93ef0098590cf51d9732b4f139533732565704f45bdc1ffa741b7c95fb54/psycopg2_binary-2.9.11-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:92e3b669236327083a2e33ccfa0d320dd01b9803b3e14dd986a4fc54aa00f4e1", size = 3756567, upload-time = "2025-10-10T11:13:11.885Z" }, @@ -3208,8 +3241,10 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/13/1e/98874ce72fd29cbde93209977b196a2edae03f8490d1bd8158e7f1daf3a0/psycopg2_binary-2.9.11-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:9b52a3f9bb540a3e4ec0f6ba6d31339727b2950c9772850d6545b7eae0b9d7c5", size = 4411646, upload-time = "2025-10-10T11:13:24.432Z" }, { url = "https://files.pythonhosted.org/packages/5a/bd/a335ce6645334fb8d758cc358810defca14a1d19ffbc8a10bd38a2328565/psycopg2_binary-2.9.11-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:db4fd476874ccfdbb630a54426964959e58da4c61c9feba73e6094d51303d7d8", size = 4468701, upload-time = "2025-10-10T11:13:29.266Z" }, { url = "https://files.pythonhosted.org/packages/44/d6/c8b4f53f34e295e45709b7568bf9b9407a612ea30387d35eb9fa84f269b4/psycopg2_binary-2.9.11-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:47f212c1d3be608a12937cc131bd85502954398aaa1320cb4c14421a0ffccf4c", size = 4166293, upload-time = "2025-10-10T11:13:33.336Z" }, + { url = "https://files.pythonhosted.org/packages/4b/e0/f8cc36eadd1b716ab36bb290618a3292e009867e5c97ce4aba908cb99644/psycopg2_binary-2.9.11-cp314-cp314-manylinux_2_38_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e35b7abae2b0adab776add56111df1735ccc71406e56203515e228a8dc07089f", size = 3983184, upload-time = "2025-10-30T02:55:32.483Z" }, { url = "https://files.pythonhosted.org/packages/53/3e/2a8fe18a4e61cfb3417da67b6318e12691772c0696d79434184a511906dc/psycopg2_binary-2.9.11-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:fcf21be3ce5f5659daefd2b3b3b6e4727b028221ddc94e6c1523425579664747", size = 3652650, upload-time = "2025-10-10T11:13:38.181Z" }, { url = "https://files.pythonhosted.org/packages/76/36/03801461b31b29fe58d228c24388f999fe814dfc302856e0d17f97d7c54d/psycopg2_binary-2.9.11-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:9bd81e64e8de111237737b29d68039b9c813bdf520156af36d26819c9a979e5f", size = 3298663, upload-time = "2025-10-10T11:13:44.878Z" }, + { url = "https://files.pythonhosted.org/packages/97/77/21b0ea2e1a73aa5fa9222b2a6b8ba325c43c3a8d54272839c991f2345656/psycopg2_binary-2.9.11-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:32770a4d666fbdafab017086655bcddab791d7cb260a16679cc5a7338b64343b", size = 3044737, upload-time = "2025-10-30T02:55:35.69Z" }, { url = "https://files.pythonhosted.org/packages/67/69/f36abe5f118c1dca6d3726ceae164b9356985805480731ac6712a63f24f0/psycopg2_binary-2.9.11-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:c3cb3a676873d7506825221045bd70e0427c905b9c8ee8d6acd70cfcbd6e576d", size = 3347643, upload-time = "2025-10-10T11:13:53.499Z" }, { url = "https://files.pythonhosted.org/packages/e1/36/9c0c326fe3a4227953dfb29f5d0c8ae3b8eb8c1cd2967aa569f50cb3c61f/psycopg2_binary-2.9.11-cp314-cp314-win_amd64.whl", hash = "sha256:4012c9c954dfaccd28f94e84ab9f94e12df76b4afb22331b1f0d3154893a6316", size = 2803913, upload-time = "2025-10-10T11:13:57.058Z" }, ] From a6995508e3476cf8cf6a225b613446d33494e71b Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Tue, 4 Nov 2025 16:14:09 +0100 Subject: [PATCH 03/13] Added Elasticsearch in CI + minor fix --- .github/workflows/integration-vector-io-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index 180c1fd96..b38f6dcab 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -31,7 +31,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant"] + vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant", "remote::elasticsearch"] python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} fail-fast: false # we want to run all tests regardless of failure From ce4fa525f8b7fabf252ce0e7cef3a8648132745d Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Wed, 5 Nov 2025 14:58:34 +0100 Subject: [PATCH 04/13] Fixed issue with steps variable in CI --- .github/workflows/integration-vector-io-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index b38f6dcab..509a1fdd9 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -179,7 +179,7 @@ jobs: ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} - ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && steps.setup-elasticsearch.outputs.elasticsearch-api-key || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ From f1c8a200c83141ccde653f7f620107eaf022e22b Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Wed, 29 Oct 2025 17:18:16 +0100 Subject: [PATCH 05/13] Draft for Elasticsearch as vector db --- .../vector_io/elasticsearch/__init__.py | 17 + .../remote/vector_io/elasticsearch/config.py | 32 ++ .../vector_io/elasticsearch/elasticsearch.py | 380 ++++++++++++++++++ pyproject.toml | 1 + .../providers/registry/vector_io.py | 48 +++ 5 files changed, 478 insertions(+) create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/__init__.py create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/config.py create mode 100644 llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py b/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py new file mode 100644 index 000000000..6370c6196 --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from llama_stack.providers.datatypes import Api, ProviderSpec + +from .config import ElasticsearchVectorIOConfig + + +async def get_adapter_impl(config: ElasticsearchVectorIOConfig, deps: dict[Api, ProviderSpec]): + from .elasticsearch import ElasticsearchVectorIOAdapter + + impl = ElasticsearchVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files)) + await impl.initialize() + return impl diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/llama_stack/providers/remote/vector_io/elasticsearch/config.py new file mode 100644 index 000000000..f8c5ed028 --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -0,0 +1,32 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Any + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference +from llama_stack.schema_utils import json_schema_type + + +@json_schema_type +class ElasticsearchVectorIOConfig(BaseModel): + elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + persistence: KVStoreReference | None = Field( + description="Config for KV store backend (SQLite only for now)", default=None + ) + + @classmethod + def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: + return { + "elasticsearch_api_key": None, + "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "persistence": KVStoreReference( + backend="kv_default", + namespace="vector_io::elasticsearch", + ).model_dump(exclude_none=True), + } \ No newline at end of file diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py new file mode 100644 index 000000000..b2a45265b --- /dev/null +++ b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -0,0 +1,380 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +from typing import Any + +from numpy.typing import NDArray +from elasticsearch import AsyncElasticsearch +from elasticsearch.helpers import async_bulk + +from llama_stack.apis.common.errors import VectorStoreNotFoundError +from llama_stack.apis.files import Files +from llama_stack.apis.inference import Inference, InterleavedContent +from llama_stack.apis.vector_io import ( + Chunk, + QueryChunksResponse, + VectorIO, + VectorStoreChunkingStrategy, + VectorStoreFileObject, +) +from llama_stack.apis.vector_stores import VectorStore +from llama_stack.log import get_logger +from llama_stack.providers.datatypes import VectorStoresProtocolPrivate +from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin +from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex + +from .config import ElasticsearchVectorIOConfig + +log = get_logger(name=__name__, category="vector_io::elasticsearch") + +# KV store prefixes for vector databases +VERSION = "v3" +VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" + + +class ElasticsearchIndex(EmbeddingIndex): + def __init__(self, client: AsyncElasticsearch, collection_name: str): + self.client = client + self.collection_name = collection_name + + async def initialize(self) -> None: + # Elasticsearch collections (indexes) are created on-demand in add_chunks + # If the index does not exist, it will be created in add_chunks. + pass + + async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray): + """Adds chunks and their embeddings to the Elasticsearch index.""" + + assert len(chunks) == len(embeddings), ( + f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}" + ) + + if not await self.client.indices.exists(self.collection_name): + await self.client.indices.create( + index=self.collection_name, + body={ + "mappings": { + "properties": { + "vector": { + "type": "dense_vector", + "dims": len(embeddings[0]) + }, + "chunk_content": {"type": "object"}, + } + } + } + ) + + actions = [] + for chunk, embedding in zip(chunks, embeddings, strict=False): + actions.append({ + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": { + "vector": embedding, + "chunk_content": chunk.model_dump_json() + } + }) + + try: + successful_count, error_count = await async_bulk( + client=self.client, + actions=actions, + timeout='300s', + refresh=True, + raise_on_error=False, + stats_only=True + ) + if error_count > 0: + log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}") + + log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error adding chunks to Elasticsearch index {self.collection_name}: {e}") + raise + + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Remove a chunk from the Elasticsearch index.""" + + actions = [] + for chunk in chunks_for_deletion: + actions.append({ + "_op_type": "delete", + "_index": self.collection_name, + "_id": chunk.chunk_id + }) + + try: + successful_count, error_count = await async_bulk( + client=self.client, + actions=actions, + timeout='300s', + refresh=True, + raise_on_error=True, + stats_only=True + ) + if error_count > 0: + log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}") + + log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") + except Exception as e: + log.error(f"Error deleting chunks from Elasticsearch index {self.collection_name}: {e}") + raise + + async def _results_to_chunks(self, results: dict) -> QueryChunksResponse: + """Convert search results to QueryChunksResponse.""" + + chunks, scores = [], [] + for result in results['hits']['hits']: + try: + chunk = Chunk( + content=result["_source"]["chunk_content"], + chunk_id=result["_id"], + embedding=result["_source"]["vector"] + ) + except Exception: + log.exception("Failed to parse chunk") + continue + + chunks.append(chunk) + scores.append(result["_score"]) + + return QueryChunksResponse(chunks=chunks, scores=scores) + + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: + """Vector search using kNN.""" + + try: + results = ( + await self.client.search( + index=self.collection_name, + query={ + "knn": { + "field": "vector", + "query_vector": embedding.tolist(), + "k": k + } + }, + min_score=score_threshold, + limit=k + ) + ) + except Exception as e: + log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse: + """Keyword search using match query.""" + + try: + results = ( + await self.client.search( + index=self.collection_name, + query={ + "match": { + "chunk_content": { + "query": query_string + } + } + }, + min_score=score_threshold, + limit=k + ) + ) + except Exception as e: + log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def query_hybrid( + self, + embedding: NDArray, + query_string: str, + k: int, + score_threshold: float, + reranker_type: str, + reranker_params: dict[str, Any] | None = None, + ) -> QueryChunksResponse: + + supported_retrievers = ["rrf", "linear"] + if reranker_type not in supported_retrievers: + raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") + + retriever = { + reranker_type: { + "retrievers": [ + { + "retriever": { + "standard": { + "query": { + "match": { + "chunk_content": query_string + } + } + } + } + }, + { + "knn": { + "field": "vector", + "query_vector": embedding.tolist(), + "k": k + } + } + ] + } + } + + # Add reranker parameters if provided for RRF (e.g. rank_constant) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/rrf-retriever + if reranker_type == "rrf" and reranker_params is not None: + retriever["rrf"].update(reranker_params) + # Add reranker parameters if provided for Linear (e.g. weights) + # see https://www.elastic.co/docs/reference/elasticsearch/rest-apis/retrievers/linear-retriever + # Since the weights are per retriever, we need to update them separately, using the following syntax + # reranker_params = { + # "retrievers": { + # "standard": {"weight": 0.7}, + # "knn": {"weight": 0.3} + # } + # } + elif reranker_type == "linear" and reranker_params is not None: + retrievers_params = reranker_params.get("retrievers") + if retrievers_params is not None: + for i in range(0, len(retriever["linear"]["retrievers"])): + retr_type=retriever["linear"]["retrievers"][i]["retriever"].key() + retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) + del reranker_params["retrievers"] + retriever["linear"].update(reranker_params) + + try: + results = await self.client.search( + index=self.collection_name, + size=k, + retriever=retriever, + min_score=score_threshold + ) + except Exception as e: + log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") + raise + + return await self._results_to_chunks(results) + + async def delete(self): + """Delete the entire Elasticsearch index with collection_name.""" + + try: + await self.client.delete(index=self.collection_name) + except Exception as e: + log.error(f"Error deleting Elasticsearch index {self.collection_name}: {e}") + raise + +class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate): + def __init__( + self, + config: ElasticsearchVectorIOConfig, + inference_api: Inference, + files_api: Files | None = None, + ) -> None: + super().__init__(files_api=files_api, kvstore=None) + self.config = config + self.client: AsyncElasticsearch = None + self.cache = {} + self.inference_api = inference_api + self.vector_store_table = None + + async def initialize(self) -> None: + client_config = self.config.model_dump(exclude_none=True) + self.client = AsyncElasticsearch(**client_config) + self.kvstore = await kvstore_impl(self.config.persistence) + + start_key = VECTOR_DBS_PREFIX + end_key = f"{VECTOR_DBS_PREFIX}\xff" + stored_vector_stores = await self.kvstore.values_in_range(start_key, end_key) + + for vector_store_data in stored_vector_stores: + vector_store = VectorStore.model_validate_json(vector_store_data) + index = VectorStoreWithIndex( + vector_store, ElasticsearchIndex(self.client, vector_store.identifier), self.inference_api + ) + self.cache[vector_store.identifier] = index + self.openai_vector_stores = await self._load_openai_vector_stores() + + async def shutdown(self) -> None: + await self.client.close() + # Clean up mixin resources (file batch tasks) + await super().shutdown() + + async def register_vector_store(self, vector_store: VectorStore) -> None: + assert self.kvstore is not None + key = f"{VECTOR_DBS_PREFIX}{vector_store.identifier}" + await self.kvstore.set(key=key, value=vector_store.model_dump_json()) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(self.client, vector_store.identifier), + inference_api=self.inference_api, + ) + + self.cache[vector_store.identifier] = index + + async def unregister_vector_store(self, vector_store_id: str) -> None: + if vector_store_id in self.cache: + await self.cache[vector_store_id].index.delete() + del self.cache[vector_store_id] + + assert self.kvstore is not None + await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_store_id}") + + async def _get_and_cache_vector_store_index(self, vector_store_id: str) -> VectorStoreWithIndex | None: + if vector_store_id in self.cache: + return self.cache[vector_store_id] + + if self.vector_store_table is None: + raise ValueError(f"Vector DB not found {vector_store_id}") + + vector_store = await self.vector_store_table.get_vector_store(vector_store_id) + if not vector_store: + raise VectorStoreNotFoundError(vector_store_id) + + index = VectorStoreWithIndex( + vector_store=vector_store, + index=ElasticsearchIndex(client=self.client, collection_name=vector_store.identifier), + inference_api=self.inference_api, + ) + self.cache[vector_store_id] = index + return index + + async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + await index.insert_chunks(chunks) + + async def query_chunks( + self, vector_db_id: str, query: InterleavedContent, params: dict[str, Any] | None = None + ) -> QueryChunksResponse: + index = await self._get_and_cache_vector_store_index(vector_db_id) + if not index: + raise VectorStoreNotFoundError(vector_db_id) + + return await index.query_chunks(query, params) + + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from an Elasticsearch vector store.""" + index = await self._get_and_cache_vector_store_index(store_id) + if not index: + raise ValueError(f"Vector DB {store_id} not found") + + await index.index.delete_chunks(chunks_for_deletion) diff --git a/pyproject.toml b/pyproject.toml index 8f07f9cbd..3bd448943 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ test = [ "pymilvus>=2.6.1", "milvus-lite>=2.5.0", "weaviate-client>=4.16.4", + "elasticsearch>=8.16.0, <9.0.0" ] docs = [ "setuptools", diff --git a/src/llama_stack/providers/registry/vector_io.py b/src/llama_stack/providers/registry/vector_io.py index 55b302751..a561c1a5d 100644 --- a/src/llama_stack/providers/registry/vector_io.py +++ b/src/llama_stack/providers/registry/vector_io.py @@ -825,4 +825,52 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi Please refer to the remote provider documentation. """, ), + InlineProviderSpec( + api=Api.vector_io, + adapter_type="elasticsearch", + provider_type="remote::elasticsearch", + pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + module="llama_stack.providers.remote.vector_io.elasticsearch", + config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", + api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files, Api.models], + description=""" +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""", + ) ] From 12f8d96f48e2b35bafd2b1120c57e0f16c24537c Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Fri, 31 Oct 2025 18:18:27 +0100 Subject: [PATCH 06/13] Added tests + docs + CI for Elasticsearch --- .../workflows/integration-vector-io-tests.yml | 15 ++ .../vector_io/remote_elasticsearch.mdx | 104 ++++++++++++++ .../remote/vector_io/elasticsearch/config.py | 10 +- .../vector_io/elasticsearch/elasticsearch.py | 136 ++++++------------ .../providers/registry/vector_io.py | 9 +- tests/integration/conftest.py | 13 +- .../vector_io/test_openai_vector_stores.py | 4 + tests/integration/vector_io/test_vector_io.py | 2 + uv.lock | 31 +++- 9 files changed, 209 insertions(+), 115 deletions(-) create mode 100644 docs/docs/providers/vector_io/remote_elasticsearch.mdx diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index 1962629c2..8165fabbd 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -146,6 +146,14 @@ jobs: docker logs weaviate exit 1 + - name: Setup Elasticsearch + if: matrix.vector-io-provider == 'remote::elasticsearch' + id: setup-elasticsearch + run: | + curl -fsSL https://elastic.co/start-local | sh -s -- -v 9.2.0 --esonly + source elastic-start-local/.env + echo "elasticsearch-api-key=$ES_LOCAL_API_KEY" >> "$GITHUB_OUTPUT" + - name: Build Llama Stack run: | uv run --no-sync llama stack list-deps ci-tests | xargs -L1 uv pip install @@ -170,6 +178,8 @@ jobs: QDRANT_URL: ${{ matrix.vector-io-provider == 'remote::qdrant' && 'http://localhost:6333' || '' }} ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} + ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ @@ -196,6 +206,11 @@ jobs: run: | docker logs qdrant > qdrant.log + - name: Write Elasticsearch logs to file + if: ${{ always() && matrix.vector-io-provider == 'remote::elasticsearch' }} + run: | + docker logs es-local-dev > elasticsearch.log + - name: Upload all logs to artifacts if: ${{ always() }} uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 diff --git a/docs/docs/providers/vector_io/remote_elasticsearch.mdx b/docs/docs/providers/vector_io/remote_elasticsearch.mdx new file mode 100644 index 000000000..864015a5c --- /dev/null +++ b/docs/docs/providers/vector_io/remote_elasticsearch.mdx @@ -0,0 +1,104 @@ +--- +description: | + [Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. + It allows you to store and query vectors directly within an Elasticsearch database. + That means you're not limited to storing vectors in memory or in a separate service. + + ## Features + Elasticsearch supports: + - Store embeddings and their metadata + - Vector search + - Full-text search + - Fuzzy search + - Hybrid search + - Document storage + - Metadata filtering + - Inference service + - Machine Learning integrations + + ## Usage + + To use Elasticsearch in your Llama Stack project, follow these steps: + + 1. Install the necessary dependencies. + 2. Configure your Llama Stack project to use Elasticsearch. + 3. Start storing and querying vectors. + + ## Installation + + You can test Elasticsearch locally by running this script in the terminal: + + ```bash + curl -fsSL https://elastic.co/start-local | sh + ``` + + Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. + For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + + ## Documentation + See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +sidebar_label: Remote - Elasticsearch +title: remote::elasticsearch +--- + +# remote::elasticsearch + +## Description + + +[Elasticsearch](https://www.elastic.co/) is a vector database provider for Llama Stack. +It allows you to store and query vectors directly within an Elasticsearch database. +That means you're not limited to storing vectors in memory or in a separate service. + +## Features +Elasticsearch supports: +- Store embeddings and their metadata +- Vector search +- Full-text search +- Fuzzy search +- Hybrid search +- Document storage +- Metadata filtering +- Inference service +- Machine Learning integrations + +## Usage + +To use Elasticsearch in your Llama Stack project, follow these steps: + +1. Install the necessary dependencies. +2. Configure your Llama Stack project to use Elasticsearch. +3. Start storing and querying vectors. + +## Installation + +You can test Elasticsearch locally by running this script in the terminal: + +```bash +curl -fsSL https://elastic.co/start-local | sh +``` + +Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overview?utm_campaign=llama-stack-integration) on Elastic Cloud. +For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). + +## Documentation +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. + + +## Configuration + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | +| `hosts` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | +| `persistence` | `llama_stack.core.storage.datatypes.KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | + +## Sample Configuration + +```yaml +hosts: ${env.ELASTICSEARCH_API_KEY:=None} +api_key: ${env.ELASTICSEARCH_URL:=localhost:9200} +persistence: + namespace: vector_io::elasticsearch + backend: kv_default +``` diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/llama_stack/providers/remote/vector_io/elasticsearch/config.py index f8c5ed028..4f2237d79 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/config.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class ElasticsearchVectorIOConfig(BaseModel): - elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) - elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") persistence: KVStoreReference | None = Field( description="Config for KV store backend (SQLite only for now)", default=None ) @@ -23,10 +23,10 @@ class ElasticsearchVectorIOConfig(BaseModel): @classmethod def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { - "elasticsearch_api_key": None, - "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "hosts": "${env.ELASTICSEARCH_API_KEY:=None}", + "api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}", "persistence": KVStoreReference( backend="kv_default", namespace="vector_io::elasticsearch", ).model_dump(exclude_none=True), - } \ No newline at end of file + } diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index b2a45265b..2bbb167a0 100644 --- a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -4,12 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio from typing import Any -from numpy.typing import NDArray from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk +from numpy.typing import NDArray from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.files import Files @@ -18,13 +17,10 @@ from llama_stack.apis.vector_io import ( Chunk, QueryChunksResponse, VectorIO, - VectorStoreChunkingStrategy, - VectorStoreFileObject, ) from llama_stack.apis.vector_stores import VectorStore from llama_stack.log import get_logger from llama_stack.providers.datatypes import VectorStoresProtocolPrivate -from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex @@ -36,6 +32,10 @@ log = get_logger(name=__name__, category="vector_io::elasticsearch") # KV store prefixes for vector databases VERSION = "v3" VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::" +VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::" +OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::" class ElasticsearchIndex(EmbeddingIndex): @@ -61,39 +61,32 @@ class ElasticsearchIndex(EmbeddingIndex): body={ "mappings": { "properties": { - "vector": { - "type": "dense_vector", - "dims": len(embeddings[0]) - }, + "vector": {"type": "dense_vector", "dims": len(embeddings[0])}, "chunk_content": {"type": "object"}, } } - } + }, ) actions = [] for chunk, embedding in zip(chunks, embeddings, strict=False): - actions.append({ - "_op_type": "index", - "_index": self.collection_name, - "_id": chunk.chunk_id, - "_source": { - "vector": embedding, - "chunk_content": chunk.model_dump_json() - } - }) + actions.append( + { + "_op_type": "index", + "_index": self.collection_name, + "_id": chunk.chunk_id, + "_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()}, + } + ) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=False, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}") except Exception as e: @@ -105,23 +98,16 @@ class ElasticsearchIndex(EmbeddingIndex): actions = [] for chunk in chunks_for_deletion: - actions.append({ - "_op_type": "delete", - "_index": self.collection_name, - "_id": chunk.chunk_id - }) + actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id}) try: successful_count, error_count = await async_bulk( - client=self.client, - actions=actions, - timeout='300s', - refresh=True, - raise_on_error=True, - stats_only=True + client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True ) if error_count > 0: - log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}") + log.warning( + f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}" + ) log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}") except Exception as e: @@ -132,12 +118,12 @@ class ElasticsearchIndex(EmbeddingIndex): """Convert search results to QueryChunksResponse.""" chunks, scores = [], [] - for result in results['hits']['hits']: + for result in results["hits"]["hits"]: try: chunk = Chunk( content=result["_source"]["chunk_content"], - chunk_id=result["_id"], - embedding=result["_source"]["vector"] + stored_chunk_id=result["_id"], + embedding=result["_source"]["vector"], ) except Exception: log.exception("Failed to parse chunk") @@ -147,24 +133,16 @@ class ElasticsearchIndex(EmbeddingIndex): scores.append(result["_score"]) return QueryChunksResponse(chunks=chunks, scores=scores) - + async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: """Vector search using kNN.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}") @@ -176,19 +154,11 @@ class ElasticsearchIndex(EmbeddingIndex): """Keyword search using match query.""" try: - results = ( - await self.client.search( - index=self.collection_name, - query={ - "match": { - "chunk_content": { - "query": query_string - } - } - }, - min_score=score_threshold, - limit=k - ) + results = await self.client.search( + index=self.collection_name, + query={"match": {"chunk_content": {"query": query_string}}}, + min_score=score_threshold, + limit=k, ) except Exception as e: log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}") @@ -205,32 +175,15 @@ class ElasticsearchIndex(EmbeddingIndex): reranker_type: str, reranker_params: dict[str, Any] | None = None, ) -> QueryChunksResponse: - supported_retrievers = ["rrf", "linear"] if reranker_type not in supported_retrievers: raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}") - + retriever = { reranker_type: { "retrievers": [ - { - "retriever": { - "standard": { - "query": { - "match": { - "chunk_content": query_string - } - } - } - } - }, - { - "knn": { - "field": "vector", - "query_vector": embedding.tolist(), - "k": k - } - } + {"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}}, + {"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}}, ] } } @@ -252,17 +205,14 @@ class ElasticsearchIndex(EmbeddingIndex): retrievers_params = reranker_params.get("retrievers") if retrievers_params is not None: for i in range(0, len(retriever["linear"]["retrievers"])): - retr_type=retriever["linear"]["retrievers"][i]["retriever"].key() + retr_type = list(retriever["linear"]["retrievers"][i].keys())[0] retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type]) del reranker_params["retrievers"] retriever["linear"].update(reranker_params) try: results = await self.client.search( - index=self.collection_name, - size=k, - retriever=retriever, - min_score=score_threshold + index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold ) except Exception as e: log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}") diff --git a/src/llama_stack/providers/registry/vector_io.py b/src/llama_stack/providers/registry/vector_io.py index a561c1a5d..66cc4ed6a 100644 --- a/src/llama_stack/providers/registry/vector_io.py +++ b/src/llama_stack/providers/registry/vector_io.py @@ -825,11 +825,11 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi Please refer to the remote provider documentation. """, ), - InlineProviderSpec( + RemoteProviderSpec( api=Api.vector_io, adapter_type="elasticsearch", provider_type="remote::elasticsearch", - pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS, + pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS, module="llama_stack.providers.remote.vector_io.elasticsearch", config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig", api_dependencies=[Api.inference], @@ -871,6 +871,7 @@ Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overvie For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy). ## Documentation -See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""", - ) +See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general. +""", + ), ] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0d0af687f..0bb38481f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -371,18 +371,7 @@ def vector_provider_wrapper(func): inference_mode = os.environ.get("LLAMA_STACK_TEST_INFERENCE_MODE") if inference_mode == "live": # For live tests, try all providers (they'll skip if not available) - all_providers = [ - "faiss", - "sqlite-vec", - "milvus", - "chromadb", - "pgvector", - "weaviate", - "qdrant", - ] - else: - # For CI tests (replay/record), only use providers that are available in ci-tests environment - all_providers = ["faiss", "sqlite-vec"] + all_providers = ["faiss", "sqlite-vec", "milvus", "chromadb", "pgvector", "weaviate", "qdrant", "elasticsearch"] return pytest.mark.parametrize("vector_io_provider_id", all_providers)(wrapper) diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 97ce4abe8..7aef24223 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -34,6 +34,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ]: return @@ -54,6 +55,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::pgvector", "remote::qdrant", "remote::weaviate", + "remote::elasticsearch", ], "keyword": [ "inline::milvus", @@ -61,6 +63,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], "hybrid": [ "inline::milvus", @@ -68,6 +71,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode "remote::milvus", "remote::pgvector", "remote::weaviate", + "remote::elasticsearch", ], } supported_providers = search_mode_support.get(search_mode, []) diff --git a/tests/integration/vector_io/test_vector_io.py b/tests/integration/vector_io/test_vector_io.py index 1b2099069..d48d47dcc 100644 --- a/tests/integration/vector_io/test_vector_io.py +++ b/tests/integration/vector_io/test_vector_io.py @@ -164,6 +164,7 @@ def test_insert_chunks_with_precomputed_embeddings( "inline::milvus": {"score_threshold": -1.0}, "inline::qdrant": {"score_threshold": -1.0}, "remote::qdrant": {"score_threshold": -1.0}, + "remote::elasticsearch": {"score_threshold": -1.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( @@ -214,6 +215,7 @@ def test_query_returns_valid_object_when_identical_to_embedding_in_vdb( "inline::milvus": {"score_threshold": 0.0}, "remote::qdrant": {"score_threshold": 0.0}, "inline::qdrant": {"score_threshold": 0.0}, + "remote::elasticsearch": {"score_threshold": 0.0}, } vector_store_name = "test_precomputed_embeddings_db" register_response = client_with_empty_registry.vector_stores.create( diff --git a/uv.lock b/uv.lock index de1c8879c..3201d1942 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "(python_full_version >= '3.13' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.13' and sys_platform != 'darwin' and sys_platform != 'linux')", @@ -979,6 +979,33 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "elastic-transport" +version = "8.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6a/54/d498a766ac8fa475f931da85a154666cc81a70f8eb4a780bc8e4e934e9ac/elastic_transport-8.17.1.tar.gz", hash = "sha256:5edef32ac864dca8e2f0a613ef63491ee8d6b8cfb52881fa7313ba9290cac6d2", size = 73425, upload-time = "2025-03-13T07:28:30.776Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/cd/b71d5bc74cde7fc6fd9b2ff9389890f45d9762cbbbf81dc5e51fd7588c4a/elastic_transport-8.17.1-py3-none-any.whl", hash = "sha256:192718f498f1d10c5e9aa8b9cf32aed405e469a7f0e9d6a8923431dbb2c59fb8", size = 64969, upload-time = "2025-03-13T07:28:29.031Z" }, +] + +[[package]] +name = "elasticsearch" +version = "8.19.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "elastic-transport" }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/7b/70b9d16069eda6f91d45fadd9e12faed8e4442f242ca8a81de84bc626f1b/elasticsearch-8.19.2.tar.gz", hash = "sha256:622efa6a3e662db45285f16ab57bf198ea73ac9e137e7ed8b1d1d1e47638959d", size = 797401, upload-time = "2025-10-28T16:36:44.953Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/56/01/6f77d042b83260ef9ed73ea9647dfa0ef8414eba0a3fc57a509a088ad39b/elasticsearch-8.19.2-py3-none-any.whl", hash = "sha256:c16ba20c4c76cf6952e836dae7f4e724e00ba7bf31b94b79472b873683accdd4", size = 949706, upload-time = "2025-10-28T16:36:41.003Z" }, +] + [[package]] name = "eval-type-backport" version = "0.2.2" @@ -2021,6 +2048,7 @@ test = [ { name = "chardet" }, { name = "chromadb" }, { name = "datasets" }, + { name = "elasticsearch" }, { name = "mcp" }, { name = "milvus-lite" }, { name = "psycopg2-binary" }, @@ -2167,6 +2195,7 @@ test = [ { name = "chardet" }, { name = "chromadb", specifier = ">=1.0.15" }, { name = "datasets", specifier = ">=4.0.0" }, + { name = "elasticsearch", specifier = ">=8.16.0,<9.0.0" }, { name = "mcp" }, { name = "milvus-lite", specifier = ">=2.5.0" }, { name = "psycopg2-binary", specifier = ">=2.9.0" }, From 16674d6756eeca6ddabbb40a33bc9c01655bc9a3 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Tue, 4 Nov 2025 16:14:09 +0100 Subject: [PATCH 07/13] Added Elasticsearch in CI + minor fix --- .github/workflows/integration-vector-io-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index 8165fabbd..490fae86f 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -31,7 +31,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant"] + vector-io-provider: ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::chromadb", "remote::pgvector", "remote::weaviate", "remote::qdrant", "remote::elasticsearch"] python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} fail-fast: false # we want to run all tests regardless of failure From 9bfe5245a0b94b9d08934951a3e0b561494670e7 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Wed, 5 Nov 2025 14:58:34 +0100 Subject: [PATCH 08/13] Fixed issue with steps variable in CI --- .github/workflows/integration-vector-io-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index 490fae86f..b5f8ba98d 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -179,7 +179,7 @@ jobs: ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} ELASTICSEARCH_URL: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && 'http://localhost:9200' || '' }} - ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && '${{ steps.setup-elasticsearch.outputs.elasticsearch-api-key }}' || '' }} + ELASTICSEARCH_API_KEY: ${{ matrix.vector-io-provider == 'remote::elasticsearch' && steps.setup-elasticsearch.outputs.elasticsearch-api-key || '' }} run: | uv run --no-sync \ pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ From 98a3df09bbe0914f1e78e0b889f0630a723928e2 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Thu, 6 Nov 2025 15:37:31 +0100 Subject: [PATCH 09/13] Added Elasticsearch in docs + ci-test build --- docs/docs/concepts/apis/api_providers.mdx | 2 +- docs/docs/index.mdx | 2 +- docs/docs/providers/index.mdx | 2 +- docs/sidebars.ts | 3 ++- pyproject.toml | 1 + src/llama_stack/distributions/ci-tests/build.yaml | 1 + .../providers/remote/vector_io/elasticsearch/__init__.py | 0 .../providers/remote/vector_io/elasticsearch/config.py | 0 .../providers/remote/vector_io/elasticsearch/elasticsearch.py | 0 9 files changed, 7 insertions(+), 4 deletions(-) rename {llama_stack => src/llama_stack}/providers/remote/vector_io/elasticsearch/__init__.py (100%) rename {llama_stack => src/llama_stack}/providers/remote/vector_io/elasticsearch/config.py (100%) rename {llama_stack => src/llama_stack}/providers/remote/vector_io/elasticsearch/elasticsearch.py (100%) diff --git a/docs/docs/concepts/apis/api_providers.mdx b/docs/docs/concepts/apis/api_providers.mdx index 5f0fe2ac7..fd2af3854 100644 --- a/docs/docs/concepts/apis/api_providers.mdx +++ b/docs/docs/concepts/apis/api_providers.mdx @@ -9,7 +9,7 @@ sidebar_position: 2 The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: - LLM inference providers (e.g., Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, etc.), -- Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, etc.), +- Vector databases (e.g., ChromaDB, Weaviate, Qdrant, Milvus, FAISS, PGVector, Elasticsearch, etc.), - Safety providers (e.g., Meta's Llama Guard, AWS Bedrock Guardrails, etc.) Providers come in two flavors: diff --git a/docs/docs/index.mdx b/docs/docs/index.mdx index 8c17283f9..41cbd79c6 100644 --- a/docs/docs/index.mdx +++ b/docs/docs/index.mdx @@ -54,7 +54,7 @@ Llama Stack consists of a server (with multiple pluggable API providers) and Cli Llama Stack provides adapters for popular providers across all API categories: - **Inference**: Meta Reference, Ollama, Fireworks, Together, NVIDIA, vLLM, AWS Bedrock, OpenAI, Anthropic, and more -- **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, and others +- **Vector Databases**: FAISS, Chroma, Milvus, Postgres, Weaviate, Qdrant, Elasticsearch and others - **Safety**: Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock - **Training & Evaluation**: HuggingFace, TorchTune, NVIDIA NEMO diff --git a/docs/docs/providers/index.mdx b/docs/docs/providers/index.mdx index bfc16b29a..05bb3e700 100644 --- a/docs/docs/providers/index.mdx +++ b/docs/docs/providers/index.mdx @@ -9,7 +9,7 @@ sidebar_position: 1 The goal of Llama Stack is to build an ecosystem where users can easily swap out different implementations for the same API. Examples for these include: - LLM inference providers (e.g., Meta Reference, Ollama, Fireworks, Together, AWS Bedrock, Groq, Cerebras, SambaNova, vLLM, OpenAI, Anthropic, Gemini, WatsonX, etc.), -- Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, etc.), +- Vector databases (e.g., FAISS, SQLite-Vec, ChromaDB, Weaviate, Qdrant, Milvus, PGVector, Elasticsearch, etc.), - Safety providers (e.g., Meta's Llama Guard, Prompt Guard, Code Scanner, AWS Bedrock Guardrails, etc.), - Tool Runtime providers (e.g., RAG Runtime, Brave Search, etc.) diff --git a/docs/sidebars.ts b/docs/sidebars.ts index 641c2eed3..a0da4c518 100644 --- a/docs/sidebars.ts +++ b/docs/sidebars.ts @@ -159,7 +159,8 @@ const sidebars: SidebarsConfig = { 'providers/vector_io/remote_milvus', 'providers/vector_io/remote_pgvector', 'providers/vector_io/remote_qdrant', - 'providers/vector_io/remote_weaviate' + 'providers/vector_io/remote_weaviate', + 'providers/vector_io/remote_elasticsearch' ], }, { diff --git a/pyproject.toml b/pyproject.toml index 3bd448943..4ffbfd1e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -323,6 +323,7 @@ exclude = [ "^src/llama_stack/providers/remote/vector_io/qdrant/", "^src/llama_stack/providers/remote/vector_io/sample/", "^src/llama_stack/providers/remote/vector_io/weaviate/", + "^src/llama_stack/providers/remote/vector_io/elasticsearch/", "^src/llama_stack/providers/utils/bedrock/client\\.py$", "^src/llama_stack/providers/utils/bedrock/refreshable_boto_session\\.py$", "^src/llama_stack/providers/utils/inference/embedding_mixin\\.py$", diff --git a/src/llama_stack/distributions/ci-tests/build.yaml b/src/llama_stack/distributions/ci-tests/build.yaml index f29ac7712..4305cc9b1 100644 --- a/src/llama_stack/distributions/ci-tests/build.yaml +++ b/src/llama_stack/distributions/ci-tests/build.yaml @@ -27,6 +27,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py similarity index 100% rename from llama_stack/providers/remote/vector_io/elasticsearch/__init__.py rename to src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py similarity index 100% rename from llama_stack/providers/remote/vector_io/elasticsearch/config.py rename to src/llama_stack/providers/remote/vector_io/elasticsearch/config.py diff --git a/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py similarity index 100% rename from llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py rename to src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py From 149aaa5b22bb237bbf695de23507db6916cd8bc2 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Mon, 10 Nov 2025 12:31:03 +0100 Subject: [PATCH 10/13] Updated Elasticsearch configurations --- src/llama_stack/distributions/starter-gpu/build.yaml | 1 + .../starter-gpu/run-with-postgres-store.yaml | 8 ++++++++ src/llama_stack/distributions/starter-gpu/run.yaml | 8 ++++++++ src/llama_stack/distributions/starter/build.yaml | 1 + .../starter/run-with-postgres-store.yaml | 8 ++++++++ src/llama_stack/distributions/starter/starter.py | 11 +++++++++++ .../remote/vector_io/elasticsearch/config.py | 8 ++++---- .../remote/vector_io/elasticsearch/elasticsearch.py | 6 ++++-- 8 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/llama_stack/distributions/starter-gpu/build.yaml b/src/llama_stack/distributions/starter-gpu/build.yaml index 10cbb1389..7b4465206 100644 --- a/src/llama_stack/distributions/starter-gpu/build.yaml +++ b/src/llama_stack/distributions/starter-gpu/build.yaml @@ -28,6 +28,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml index 1920ebd9d..3f06f6c85 100644 --- a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter-gpu/run.yaml b/src/llama_stack/distributions/starter-gpu/run.yaml index 7149b8659..41394e1b8 100644 --- a/src/llama_stack/distributions/starter-gpu/run.yaml +++ b/src/llama_stack/distributions/starter-gpu/run.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/build.yaml b/src/llama_stack/distributions/starter/build.yaml index acd51f773..35566be15 100644 --- a/src/llama_stack/distributions/starter/build.yaml +++ b/src/llama_stack/distributions/starter/build.yaml @@ -28,6 +28,7 @@ distribution_spec: - provider_type: remote::pgvector - provider_type: remote::qdrant - provider_type: remote::weaviate + - provider_type: remote::elasticsearch files: - provider_type: inline::localfs safety: diff --git a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml index 702f95381..6283061c6 100644 --- a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/starter.py b/src/llama_stack/distributions/starter/starter.py index 88cd3a4fe..7f47c47c9 100644 --- a/src/llama_stack/distributions/starter/starter.py +++ b/src/llama_stack/distributions/starter/starter.py @@ -41,6 +41,7 @@ from llama_stack.providers.remote.vector_io.pgvector.config import ( ) from llama_stack.providers.remote.vector_io.qdrant.config import QdrantVectorIOConfig from llama_stack.providers.remote.vector_io.weaviate.config import WeaviateVectorIOConfig +from llama_stack.providers.remote.vector_io.elasticsearch.config import ElasticsearchVectorIOConfig from llama_stack.providers.utils.kvstore.config import PostgresKVStoreConfig from llama_stack.providers.utils.sqlstore.sqlstore import PostgresSqlStoreConfig @@ -126,6 +127,7 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: BuildProvider(provider_type="remote::pgvector"), BuildProvider(provider_type="remote::qdrant"), BuildProvider(provider_type="remote::weaviate"), + BuildProvider(provider_type="remote::elasticsearch"), ], "files": [BuildProvider(provider_type="inline::localfs")], "safety": [ @@ -240,6 +242,15 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: cluster_url="${env.WEAVIATE_CLUSTER_URL:=}", ), ), + Provider( + provider_id="${env.ELASTICSEARCH_URL:+elasticsearch}", + provider_type="remote::elasticsearch", + config=ElasticsearchVectorIOConfig.sample_run_config( + f"~/.llama/distributions/{name}", + elasticsearch_url="${env.ELASTICSEARCH_URL:=localhost:9200}", + elasticsearch_api_key="${env.ELASTICSEARCH_API_KEY:=}", + ), + ), ], "files": [files_provider], } diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py index 4f2237d79..87624b265 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type @json_schema_type class ElasticsearchVectorIOConfig(BaseModel): - api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) - hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") + elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None) + elasticsearch_hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200") persistence: KVStoreReference | None = Field( description="Config for KV store backend (SQLite only for now)", default=None ) @@ -23,8 +23,8 @@ class ElasticsearchVectorIOConfig(BaseModel): @classmethod def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]: return { - "hosts": "${env.ELASTICSEARCH_API_KEY:=None}", - "api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}", + "elasticsearch_api_key": "${env.ELASTICSEARCH_API_KEY:=}", "persistence": KVStoreReference( backend="kv_default", namespace="vector_io::elasticsearch", diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index 2bbb167a0..8fc876b23 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -244,8 +244,10 @@ class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStore self.vector_store_table = None async def initialize(self) -> None: - client_config = self.config.model_dump(exclude_none=True) - self.client = AsyncElasticsearch(**client_config) + self.client = AsyncElasticsearch( + hosts=self.config.elasticsearch_url, + api_key=self.config.elasticsearch_api_key + ) self.kvstore = await kvstore_impl(self.config.persistence) start_key = VECTOR_DBS_PREFIX From 723589ecd8d36f996b8a4d3b6c02e192641d260d Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Mon, 10 Nov 2025 15:33:47 +0100 Subject: [PATCH 11/13] Added Elasticsearch in distrubution/starter/run --- src/llama_stack/distributions/starter/run.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/llama_stack/distributions/starter/run.yaml b/src/llama_stack/distributions/starter/run.yaml index 0ce392810..81d7e47cb 100644 --- a/src/llama_stack/distributions/starter/run.yaml +++ b/src/llama_stack/distributions/starter/run.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs From df23e0c32fb8831b7ff07dd6aeb140b3d20bb238 Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Thu, 20 Nov 2025 14:34:04 +0100 Subject: [PATCH 12/13] Fixed pre commit --- docs/docs/providers/vector_io/remote_elasticsearch.mdx | 10 +++++----- .../ci-tests/run-with-postgres-store.yaml | 8 ++++++++ src/llama_stack/distributions/ci-tests/run.yaml | 8 ++++++++ .../distributions/starter/run-with-postgres-store.yaml | 4 ++-- src/llama_stack/distributions/starter/run.yaml | 2 +- .../remote/vector_io/elasticsearch/__init__.py | 2 +- .../providers/remote/vector_io/elasticsearch/config.py | 2 +- .../remote/vector_io/elasticsearch/elasticsearch.py | 6 ++---- 8 files changed, 28 insertions(+), 14 deletions(-) diff --git a/docs/docs/providers/vector_io/remote_elasticsearch.mdx b/docs/docs/providers/vector_io/remote_elasticsearch.mdx index 864015a5c..5deed1a27 100644 --- a/docs/docs/providers/vector_io/remote_elasticsearch.mdx +++ b/docs/docs/providers/vector_io/remote_elasticsearch.mdx @@ -89,15 +89,15 @@ See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search | Field | Type | Required | Default | Description | |-------|------|----------|---------|-------------| -| `api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | -| `hosts` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | -| `persistence` | `llama_stack.core.storage.datatypes.KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | +| `elasticsearch_api_key` | `str \| None` | No | | The API key for the Elasticsearch instance | +| `elasticsearch_url` | `str \| None` | No | localhost:9200 | The URL of the Elasticsearch instance | +| `persistence` | `KVStoreReference \| None` | No | | Config for KV store backend (SQLite only for now) | ## Sample Configuration ```yaml -hosts: ${env.ELASTICSEARCH_API_KEY:=None} -api_key: ${env.ELASTICSEARCH_URL:=localhost:9200} +elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} +elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} persistence: namespace: vector_io::elasticsearch backend: kv_default diff --git a/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml index 5384b58fe..590e01668 100644 --- a/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/ci-tests/run.yaml b/src/llama_stack/distributions/ci-tests/run.yaml index 1118d2ad1..b4caf1668 100644 --- a/src/llama_stack/distributions/ci-tests/run.yaml +++ b/src/llama_stack/distributions/ci-tests/run.yaml @@ -146,6 +146,14 @@ providers: persistence: namespace: vector_io::weaviate backend: kv_default + - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} + provider_type: remote::elasticsearch + config: + elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} + persistence: + namespace: vector_io::elasticsearch + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml index e2e301d9e..f6e301b9f 100644 --- a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml @@ -149,11 +149,11 @@ providers: - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} provider_type: remote::elasticsearch config: - elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} persistence: namespace: vector_io::elasticsearch - backend: kv_default + backend: kv_default files: - provider_id: meta-reference-files provider_type: inline::localfs diff --git a/src/llama_stack/distributions/starter/run.yaml b/src/llama_stack/distributions/starter/run.yaml index 81d7e47cb..e2beeec7c 100644 --- a/src/llama_stack/distributions/starter/run.yaml +++ b/src/llama_stack/distributions/starter/run.yaml @@ -149,8 +149,8 @@ providers: - provider_id: ${env.ELASTICSEARCH_URL:+elasticsearch} provider_type: remote::elasticsearch config: - elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} elasticsearch_url: ${env.ELASTICSEARCH_URL:=localhost:9200} + elasticsearch_api_key: ${env.ELASTICSEARCH_API_KEY:=} persistence: namespace: vector_io::elasticsearch backend: kv_default diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py index 6370c6196..33e3930a2 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/__init__.py @@ -4,7 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -from llama_stack.providers.datatypes import Api, ProviderSpec +from llama_stack_api import Api, ProviderSpec from .config import ElasticsearchVectorIOConfig diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py index 0559868f9..caa7e7891 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/config.py @@ -9,7 +9,7 @@ from typing import Any from pydantic import BaseModel, Field from llama_stack.core.storage.datatypes import KVStoreReference -from llama_stack.schema_utils import json_schema_type +from llama_stack_api import json_schema_type @json_schema_type diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index 8fc876b23..85042413a 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -229,6 +229,7 @@ class ElasticsearchIndex(EmbeddingIndex): log.error(f"Error deleting Elasticsearch index {self.collection_name}: {e}") raise + class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate): def __init__( self, @@ -244,10 +245,7 @@ class ElasticsearchVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStore self.vector_store_table = None async def initialize(self) -> None: - self.client = AsyncElasticsearch( - hosts=self.config.elasticsearch_url, - api_key=self.config.elasticsearch_api_key - ) + self.client = AsyncElasticsearch(hosts=self.config.elasticsearch_url, api_key=self.config.elasticsearch_api_key) self.kvstore = await kvstore_impl(self.config.persistence) start_key = VECTOR_DBS_PREFIX From eb2badddeb63e8c5eaba6fc58f9e91bb3092bb2e Mon Sep 17 00:00:00 2001 From: Enrico Zimuel Date: Mon, 24 Nov 2025 17:15:15 +0100 Subject: [PATCH 13/13] Changed import Elasticsearch vector_io --- .../vector_io/elasticsearch/elasticsearch.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py index 85042413a..aa3a986dc 100644 --- a/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py +++ b/src/llama_stack/providers/remote/vector_io/elasticsearch/elasticsearch.py @@ -10,20 +10,21 @@ from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk from numpy.typing import NDArray -from llama_stack.apis.common.errors import VectorStoreNotFoundError -from llama_stack.apis.files import Files -from llama_stack.apis.inference import Inference, InterleavedContent -from llama_stack.apis.vector_io import ( - Chunk, - QueryChunksResponse, - VectorIO, -) -from llama_stack.apis.vector_stores import VectorStore +from llama_stack.core.storage.kvstore import kvstore_impl from llama_stack.log import get_logger -from llama_stack.providers.datatypes import VectorStoresProtocolPrivate -from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex +from llama_stack_api import ( + Chunk, + Files, + Inference, + InterleavedContent, + QueryChunksResponse, + VectorIO, + VectorStore, + VectorStoreNotFoundError, + VectorStoresProtocolPrivate, +) from .config import ElasticsearchVectorIOConfig