mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 18:00:36 +00:00
Added tests + docs + CI for Elasticsearch
This commit is contained in:
parent
6f39c8994d
commit
22b27e6275
9 changed files with 215 additions and 112 deletions
|
|
@ -825,11 +825,11 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi
|
|||
Please refer to the remote provider documentation.
|
||||
""",
|
||||
),
|
||||
InlineProviderSpec(
|
||||
RemoteProviderSpec(
|
||||
api=Api.vector_io,
|
||||
adapter_type="elasticsearch",
|
||||
provider_type="remote::elasticsearch",
|
||||
pip_packages=["elasticsearch>=8.16.0, <9.0.0"] + DEFAULT_VECTOR_IO_DEPS,
|
||||
pip_packages=["elasticsearch>=8.16.0,<9.0.0"] + DEFAULT_VECTOR_IO_DEPS,
|
||||
module="llama_stack.providers.remote.vector_io.elasticsearch",
|
||||
config_class="llama_stack.providers.remote.vector_io.elasticsearch.ElasticsearchVectorIOConfig",
|
||||
api_dependencies=[Api.inference],
|
||||
|
|
@ -871,6 +871,7 @@ Or you can [start a free trial](https://www.elastic.co/cloud/cloud-trial-overvie
|
|||
For more information on how to deploy Elasticsearch, see the [official documentation](https://www.elastic.co/docs/deploy-manage/deploy).
|
||||
|
||||
## Documentation
|
||||
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.""",
|
||||
)
|
||||
See [Elasticsearch's documentation](https://www.elastic.co/docs/solutions/search) for more details about Elasticsearch in general.
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
|
|
|||
|
|
@ -14,8 +14,8 @@ from llama_stack.schema_utils import json_schema_type
|
|||
|
||||
@json_schema_type
|
||||
class ElasticsearchVectorIOConfig(BaseModel):
|
||||
elasticsearch_api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None)
|
||||
elasticsearch_url: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200")
|
||||
api_key: str | None = Field(description="The API key for the Elasticsearch instance", default=None)
|
||||
hosts: str | None = Field(description="The URL of the Elasticsearch instance", default="localhost:9200")
|
||||
persistence: KVStoreReference | None = Field(
|
||||
description="Config for KV store backend (SQLite only for now)", default=None
|
||||
)
|
||||
|
|
@ -23,10 +23,10 @@ class ElasticsearchVectorIOConfig(BaseModel):
|
|||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
|
||||
return {
|
||||
"elasticsearch_api_key": None,
|
||||
"elasticsearch_url": "${env.ELASTICSEARCH_URL:=localhost:9200}",
|
||||
"hosts": "${env.ELASTICSEARCH_API_KEY:=None}",
|
||||
"api_key": "${env.ELASTICSEARCH_URL:=localhost:9200}",
|
||||
"persistence": KVStoreReference(
|
||||
backend="kv_default",
|
||||
namespace="vector_io::elasticsearch",
|
||||
).model_dump(exclude_none=True),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,11 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from numpy.typing import NDArray
|
||||
from elasticsearch import AsyncElasticsearch
|
||||
from elasticsearch.helpers import async_bulk
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files import Files
|
||||
|
|
@ -18,13 +17,10 @@ from llama_stack.apis.vector_io import (
|
|||
Chunk,
|
||||
QueryChunksResponse,
|
||||
VectorIO,
|
||||
VectorStoreChunkingStrategy,
|
||||
VectorStoreFileObject,
|
||||
)
|
||||
from llama_stack.apis.vector_stores import VectorStore
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.datatypes import VectorStoresProtocolPrivate
|
||||
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import ChunkForDeletion, EmbeddingIndex, VectorStoreWithIndex
|
||||
|
|
@ -36,6 +32,10 @@ log = get_logger(name=__name__, category="vector_io::elasticsearch")
|
|||
# KV store prefixes for vector databases
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_stores:elasticsearch:{VERSION}::"
|
||||
VECTOR_INDEX_PREFIX = f"vector_index:elasticsearch:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:elasticsearch:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:elasticsearch:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:elasticsearch:{VERSION}::"
|
||||
|
||||
|
||||
class ElasticsearchIndex(EmbeddingIndex):
|
||||
|
|
@ -61,39 +61,32 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
body={
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"vector": {
|
||||
"type": "dense_vector",
|
||||
"dims": len(embeddings[0])
|
||||
},
|
||||
"vector": {"type": "dense_vector", "dims": len(embeddings[0])},
|
||||
"chunk_content": {"type": "object"},
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
actions = []
|
||||
for chunk, embedding in zip(chunks, embeddings, strict=False):
|
||||
actions.append({
|
||||
"_op_type": "index",
|
||||
"_index": self.collection_name,
|
||||
"_id": chunk.chunk_id,
|
||||
"_source": {
|
||||
"vector": embedding,
|
||||
"chunk_content": chunk.model_dump_json()
|
||||
}
|
||||
})
|
||||
actions.append(
|
||||
{
|
||||
"_op_type": "index",
|
||||
"_index": self.collection_name,
|
||||
"_id": chunk.chunk_id,
|
||||
"_source": {"vector": embedding, "chunk_content": chunk.model_dump_json()},
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
successful_count, error_count = await async_bulk(
|
||||
client=self.client,
|
||||
actions=actions,
|
||||
timeout='300s',
|
||||
refresh=True,
|
||||
raise_on_error=False,
|
||||
stats_only=True
|
||||
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=False, stats_only=True
|
||||
)
|
||||
if error_count > 0:
|
||||
log.warning(f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}")
|
||||
log.warning(
|
||||
f"{error_count} out of {len(chunks)} documents failed to upload in Elasticsearch index {self.collection_name}"
|
||||
)
|
||||
|
||||
log.info(f"Successfully added {successful_count} chunks to Elasticsearch index {self.collection_name}")
|
||||
except Exception as e:
|
||||
|
|
@ -105,23 +98,16 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
|
||||
actions = []
|
||||
for chunk in chunks_for_deletion:
|
||||
actions.append({
|
||||
"_op_type": "delete",
|
||||
"_index": self.collection_name,
|
||||
"_id": chunk.chunk_id
|
||||
})
|
||||
actions.append({"_op_type": "delete", "_index": self.collection_name, "_id": chunk.chunk_id})
|
||||
|
||||
try:
|
||||
successful_count, error_count = await async_bulk(
|
||||
client=self.client,
|
||||
actions=actions,
|
||||
timeout='300s',
|
||||
refresh=True,
|
||||
raise_on_error=True,
|
||||
stats_only=True
|
||||
client=self.client, actions=actions, timeout="300s", refresh=True, raise_on_error=True, stats_only=True
|
||||
)
|
||||
if error_count > 0:
|
||||
log.warning(f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}")
|
||||
log.warning(
|
||||
f"{error_count} out of {len(chunks_for_deletion)} documents failed to be deleted in Elasticsearch index {self.collection_name}"
|
||||
)
|
||||
|
||||
log.info(f"Successfully deleted {successful_count} chunks from Elasticsearch index {self.collection_name}")
|
||||
except Exception as e:
|
||||
|
|
@ -132,12 +118,12 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
"""Convert search results to QueryChunksResponse."""
|
||||
|
||||
chunks, scores = [], []
|
||||
for result in results['hits']['hits']:
|
||||
for result in results["hits"]["hits"]:
|
||||
try:
|
||||
chunk = Chunk(
|
||||
content=result["_source"]["chunk_content"],
|
||||
chunk_id=result["_id"],
|
||||
embedding=result["_source"]["vector"]
|
||||
stored_chunk_id=result["_id"],
|
||||
embedding=result["_source"]["vector"],
|
||||
)
|
||||
except Exception:
|
||||
log.exception("Failed to parse chunk")
|
||||
|
|
@ -147,24 +133,16 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
scores.append(result["_score"])
|
||||
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
|
||||
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
|
||||
"""Vector search using kNN."""
|
||||
|
||||
try:
|
||||
results = (
|
||||
await self.client.search(
|
||||
index=self.collection_name,
|
||||
query={
|
||||
"knn": {
|
||||
"field": "vector",
|
||||
"query_vector": embedding.tolist(),
|
||||
"k": k
|
||||
}
|
||||
},
|
||||
min_score=score_threshold,
|
||||
limit=k
|
||||
)
|
||||
results = await self.client.search(
|
||||
index=self.collection_name,
|
||||
query={"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
|
||||
min_score=score_threshold,
|
||||
limit=k,
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Error performing vector query on Elasticsearch index {self.collection_name}: {e}")
|
||||
|
|
@ -176,19 +154,11 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
"""Keyword search using match query."""
|
||||
|
||||
try:
|
||||
results = (
|
||||
await self.client.search(
|
||||
index=self.collection_name,
|
||||
query={
|
||||
"match": {
|
||||
"chunk_content": {
|
||||
"query": query_string
|
||||
}
|
||||
}
|
||||
},
|
||||
min_score=score_threshold,
|
||||
limit=k
|
||||
)
|
||||
results = await self.client.search(
|
||||
index=self.collection_name,
|
||||
query={"match": {"chunk_content": {"query": query_string}}},
|
||||
min_score=score_threshold,
|
||||
limit=k,
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Error performing keyword query on Elasticsearch index {self.collection_name}: {e}")
|
||||
|
|
@ -205,32 +175,15 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
reranker_type: str,
|
||||
reranker_params: dict[str, Any] | None = None,
|
||||
) -> QueryChunksResponse:
|
||||
|
||||
supported_retrievers = ["rrf", "linear"]
|
||||
if reranker_type not in supported_retrievers:
|
||||
raise ValueError(f"Unsupported reranker type: {reranker_type}. Supported types are: {supported_retrievers}")
|
||||
|
||||
|
||||
retriever = {
|
||||
reranker_type: {
|
||||
"retrievers": [
|
||||
{
|
||||
"retriever": {
|
||||
"standard": {
|
||||
"query": {
|
||||
"match": {
|
||||
"chunk_content": query_string
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"knn": {
|
||||
"field": "vector",
|
||||
"query_vector": embedding.tolist(),
|
||||
"k": k
|
||||
}
|
||||
}
|
||||
{"retriever": {"standard": {"query": {"match": {"chunk_content": query_string}}}}},
|
||||
{"knn": {"field": "vector", "query_vector": embedding.tolist(), "k": k}},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -252,17 +205,14 @@ class ElasticsearchIndex(EmbeddingIndex):
|
|||
retrievers_params = reranker_params.get("retrievers")
|
||||
if retrievers_params is not None:
|
||||
for i in range(0, len(retriever["linear"]["retrievers"])):
|
||||
retr_type=retriever["linear"]["retrievers"][i]["retriever"].key()
|
||||
retr_type = list(retriever["linear"]["retrievers"][i].keys())[0]
|
||||
retriever["linear"]["retrievers"][i].update(retrievers_params["retrievers"][retr_type])
|
||||
del reranker_params["retrievers"]
|
||||
retriever["linear"].update(reranker_params)
|
||||
|
||||
try:
|
||||
results = await self.client.search(
|
||||
index=self.collection_name,
|
||||
size=k,
|
||||
retriever=retriever,
|
||||
min_score=score_threshold
|
||||
index=self.collection_name, size=k, retriever=retriever, min_score=score_threshold
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Error performing hybrid query on Elasticsearch index {self.collection_name}: {e}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue