refactor: use generic WeightedInMemoryAggregator for hybrid search in SQLiteVecIndex (#3303)
Some checks failed
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 1s
Integration Tests (Replay) / Integration Tests (, , , client=, vision=) (push) Failing after 0s
Pre-commit / pre-commit (push) Failing after 1s
Python Package Build Test / build (3.12) (push) Failing after 1s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Vector IO Integration Tests / test-matrix (push) Failing after 2s
Test External API and Providers / test-external (venv) (push) Failing after 1s
Python Package Build Test / build (3.13) (push) Failing after 2s
UI Tests / ui-tests (22) (push) Failing after 1s
Unit Tests / unit-tests (3.12) (push) Failing after 1s
Unit Tests / unit-tests (3.13) (push) Failing after 0s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 6s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 7s

# What does this PR do?
<!-- Provide a short summary of what this PR does and why. Link to
relevant issues if applicable. -->
The purpose of this PR is to refactor `SQLiteVecIndex` to eliminate
redundant code and simplify the code using generic
`WeightedInMemoryAggregator` that can be used for any vector db
provider. This pattern is already implemented for `PGVectorIndex` in
#3064

CC: @franciscojavierarceo 

<!-- If resolving an issue, uncomment and update the line below -->
<!-- Closes #[issue-number] -->

## Test Plan
<!-- Describe the tests you ran to verify your changes with result
summaries. *Provide clear instructions so the plan can be easily
re-executed.* -->
1. `./scripts/unit-tests.sh`
2. Integration tests in CI Workflow
This commit is contained in:
IAN MILLER 2025-09-02 18:38:35 +01:00 committed by GitHub
parent 5c873d53db
commit faf891b40c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -30,11 +30,11 @@ from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
from llama_stack.providers.utils.memory.vector_store import (
RERANKER_TYPE_RRF,
RERANKER_TYPE_WEIGHTED,
ChunkForDeletion,
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator
logger = get_logger(name=__name__, category="vector_io")
@ -66,59 +66,6 @@ def _create_sqlite_connection(db_path):
return connection
def _normalize_scores(scores: dict[str, float]) -> dict[str, float]:
"""Normalize scores to [0,1] range using min-max normalization."""
if not scores:
return {}
min_score = min(scores.values())
max_score = max(scores.values())
score_range = max_score - min_score
if score_range > 0:
return {doc_id: (score - min_score) / score_range for doc_id, score in scores.items()}
return dict.fromkeys(scores, 1.0)
def _weighted_rerank(
vector_scores: dict[str, float],
keyword_scores: dict[str, float],
alpha: float = 0.5,
) -> dict[str, float]:
"""ReRanker that uses weighted average of scores."""
all_ids = set(vector_scores.keys()) | set(keyword_scores.keys())
normalized_vector_scores = _normalize_scores(vector_scores)
normalized_keyword_scores = _normalize_scores(keyword_scores)
return {
doc_id: (alpha * normalized_keyword_scores.get(doc_id, 0.0))
+ ((1 - alpha) * normalized_vector_scores.get(doc_id, 0.0))
for doc_id in all_ids
}
def _rrf_rerank(
vector_scores: dict[str, float],
keyword_scores: dict[str, float],
impact_factor: float = 60.0,
) -> dict[str, float]:
"""ReRanker that uses Reciprocal Rank Fusion."""
# Convert scores to ranks
vector_ranks = {
doc_id: i + 1 for i, (doc_id, _) in enumerate(sorted(vector_scores.items(), key=lambda x: x[1], reverse=True))
}
keyword_ranks = {
doc_id: i + 1 for i, (doc_id, _) in enumerate(sorted(keyword_scores.items(), key=lambda x: x[1], reverse=True))
}
all_ids = set(vector_scores.keys()) | set(keyword_scores.keys())
rrf_scores = {}
for doc_id in all_ids:
vector_rank = vector_ranks.get(doc_id, float("inf"))
keyword_rank = keyword_ranks.get(doc_id, float("inf"))
# RRF formula: score = 1/(k + r) where k is impact_factor and r is the rank
rrf_scores[doc_id] = (1.0 / (impact_factor + vector_rank)) + (1.0 / (impact_factor + keyword_rank))
return rrf_scores
def _make_sql_identifier(name: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
@ -398,14 +345,10 @@ class SQLiteVecIndex(EmbeddingIndex):
for chunk, score in zip(keyword_response.chunks, keyword_response.scores, strict=False)
}
# Combine scores using the specified reranker
if reranker_type == RERANKER_TYPE_WEIGHTED:
alpha = reranker_params.get("alpha", 0.5)
combined_scores = _weighted_rerank(vector_scores, keyword_scores, alpha)
else:
# Default to RRF for None, RRF, or any unknown types
impact_factor = reranker_params.get("impact_factor", 60.0)
combined_scores = _rrf_rerank(vector_scores, keyword_scores, impact_factor)
# Combine scores using the reranking utility
combined_scores = WeightedInMemoryAggregator.combine_search_results(
vector_scores, keyword_scores, reranker_type, reranker_params
)
# Sort by combined score and get top k results
sorted_items = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)