feat(qdrant): implement hybrid and keyword search support (#4006)

# What does this PR do?
-  Part of #3009
- Implement hybrid search using Qdrant's native query filtering
- Add keyword search support
- Update test suites to include qdrant for keyword and hybrid modes


<!-- If resolving an issue, uncomment and update the line below -->
<!-- Closes #[issue-number] -->

## Test Plan
<!-- Describe the tests you ran to verify your changes with result
summaries. *Provide clear instructions so the plan can be easily
re-executed.* -->
```
pytest -sv tests/unit/providers/vector_io/

.......
============================================================================================== slowest 10 durations ===============================================================================================
0.20s call     tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_max_concurrent_files_per_batch[qdrant]
0.20s call     tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_max_concurrent_files_per_batch[pgvector]
0.20s call     tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_max_concurrent_files_per_batch[sqlite_vec]
0.20s call     tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_max_concurrent_files_per_batch[faiss]
0.06s setup    tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py::test_insert_chunks_with_missing_document_id[pgvector]
0.04s call     tests/unit/providers/vector_io/test_sqlite_vec.py::test_query_chunks_hybrid_tie_breaking
0.04s call     tests/unit/providers/vector_io/test_sqlite_vec.py::test_query_chunks_hybrid_weighted_reranker_parametrization
0.03s call     tests/unit/providers/vector_io/test_sqlite_vec.py::test_query_chunks_hybrid_score_selection
0.03s call     tests/unit/providers/vector_io/test_sqlite_vec.py::test_query_chunks_hybrid_edge_cases
0.03s setup    tests/unit/providers/vector_io/test_faiss.py::test_faiss_query_vector_returns_infinity_when_query_and_embedding_are_identical
======================================================================================== 180 passed, 47 warnings in 2.78s =========================================================================================
```

Signed-off-by: Varsha Prasad Narsing <varshaprasad96@gmail.com>
Co-authored-by: Francisco Javier Arceo <arceofrancisco@gmail.com>
This commit is contained in:
Varsha 2025-12-03 13:39:01 -08:00 committed by GitHub
parent 5873a316db
commit 743683ba26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 226 additions and 3 deletions

View file

@ -129,7 +129,63 @@ class QdrantIndex(EmbeddingIndex):
return QueryChunksResponse(chunks=chunks, scores=scores)
async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse:
raise NotImplementedError("Keyword search is not supported in Qdrant")
"""
Performs keyword-based search using Qdrant's MatchText filter.
Uses Qdrant's query_filter with MatchText to search for chunks containing
the specified text query string in the chunk content.
Args:
query_string: The text query for keyword search
k: Number of results to return
score_threshold: Minimum similarity score threshold
Returns:
QueryChunksResponse with chunks and scores matching the keyword query
"""
try:
results = (
await self.client.query_points(
collection_name=self.collection_name,
query_filter=models.Filter(
must=[
models.FieldCondition(
key="chunk_content.content", match=models.MatchText(text=query_string)
)
]
),
limit=k,
with_payload=True,
with_vectors=False,
score_threshold=score_threshold,
)
).points
except Exception as e:
log.error(f"Error querying keyword search in Qdrant collection {self.collection_name}: {e}")
raise
chunks, scores = [], []
for point in results:
if not isinstance(point, models.ScoredPoint):
raise RuntimeError(f"Expected ScoredPoint from Qdrant query, got {type(point).__name__}")
if point.payload is None:
raise RuntimeError("Qdrant query returned point with no payload")
try:
chunk = Chunk(**point.payload["chunk_content"])
except Exception:
chunk_id = point.payload.get(CHUNK_ID_KEY, "unknown") if point.payload else "unknown"
point_id = getattr(point, "id", "unknown")
log.exception(
f"Failed to parse chunk in collection {self.collection_name}: "
f"chunk_id={chunk_id}, point_id={point_id}"
)
continue
chunks.append(chunk)
scores.append(point.score)
return QueryChunksResponse(chunks=chunks, scores=scores)
async def query_hybrid(
self,
@ -140,7 +196,66 @@ class QdrantIndex(EmbeddingIndex):
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
raise NotImplementedError("Hybrid search is not supported in Qdrant")
"""
Hybrid search combining vector similarity and keyword filtering in a single query.
Uses Qdrant's native capability to combine a vector query with a query_filter,
allowing vector similarity search to be filtered by keyword matches in one call.
Args:
embedding: The query embedding vector
query_string: The text query for keyword filtering
k: Number of results to return
score_threshold: Minimum similarity score threshold
reranker_type: Not used with this approach, but kept for API compatibility
reranker_params: Not used with this approach, but kept for API compatibility
Returns:
QueryChunksResponse with filtered vector search results
"""
try:
results = (
await self.client.query_points(
collection_name=self.collection_name,
query=embedding.tolist(),
query_filter=models.Filter(
must=[
models.FieldCondition(
key="chunk_content.content", match=models.MatchText(text=query_string)
)
]
),
limit=k,
with_payload=True,
score_threshold=score_threshold,
)
).points
except Exception as e:
log.error(f"Error querying hybrid search in Qdrant collection {self.collection_name}: {e}")
raise
chunks, scores = [], []
for point in results:
if not isinstance(point, models.ScoredPoint):
raise RuntimeError(f"Expected ScoredPoint from Qdrant query, got {type(point).__name__}")
if point.payload is None:
raise RuntimeError("Qdrant query returned point with no payload")
try:
chunk = Chunk(**point.payload["chunk_content"])
except Exception:
chunk_id = point.payload.get(CHUNK_ID_KEY, "unknown") if point.payload else "unknown"
point_id = getattr(point, "id", "unknown")
log.exception(
f"Failed to parse chunk in collection {self.collection_name}: "
f"chunk_id={chunk_id}, point_id={point_id}"
)
continue
chunks.append(chunk)
scores.append(point.score)
return QueryChunksResponse(chunks=chunks, scores=scores)
async def delete(self):
await self.client.delete_collection(collection_name=self.collection_name)