mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 01:48:05 +00:00
Merge 72bef1922c into 4237eb4aaa
This commit is contained in:
commit
b193be07c0
3 changed files with 228 additions and 3 deletions
|
|
@ -129,7 +129,63 @@ class QdrantIndex(EmbeddingIndex):
|
|||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
async def query_keyword(self, query_string: str, k: int, score_threshold: float) -> QueryChunksResponse:
|
||||
raise NotImplementedError("Keyword search is not supported in Qdrant")
|
||||
"""
|
||||
Performs keyword-based search using Qdrant's MatchText filter.
|
||||
|
||||
Uses Qdrant's query_filter with MatchText to search for chunks containing
|
||||
the specified text query string in the chunk content.
|
||||
|
||||
Args:
|
||||
query_string: The text query for keyword search
|
||||
k: Number of results to return
|
||||
score_threshold: Minimum similarity score threshold
|
||||
|
||||
Returns:
|
||||
QueryChunksResponse with chunks and scores matching the keyword query
|
||||
"""
|
||||
try:
|
||||
results = (
|
||||
await self.client.query_points(
|
||||
collection_name=self.collection_name,
|
||||
query_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(
|
||||
key="chunk_content.content", match=models.MatchText(text=query_string)
|
||||
)
|
||||
]
|
||||
),
|
||||
limit=k,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
score_threshold=score_threshold,
|
||||
)
|
||||
).points
|
||||
except Exception as e:
|
||||
log.error(f"Error querying keyword search in Qdrant collection {self.collection_name}: {e}")
|
||||
raise
|
||||
|
||||
chunks, scores = [], []
|
||||
for point in results:
|
||||
if not isinstance(point, models.ScoredPoint):
|
||||
raise RuntimeError(f"Expected ScoredPoint from Qdrant query, got {type(point).__name__}")
|
||||
if point.payload is None:
|
||||
raise RuntimeError("Qdrant query returned point with no payload")
|
||||
|
||||
try:
|
||||
chunk = Chunk(**point.payload["chunk_content"])
|
||||
except Exception:
|
||||
chunk_id = point.payload.get(CHUNK_ID_KEY, "unknown") if point.payload else "unknown"
|
||||
point_id = getattr(point, "id", "unknown")
|
||||
log.exception(
|
||||
f"Failed to parse chunk in collection {self.collection_name}: "
|
||||
f"chunk_id={chunk_id}, point_id={point_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
chunks.append(chunk)
|
||||
scores.append(point.score)
|
||||
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
async def query_hybrid(
|
||||
self,
|
||||
|
|
@ -140,7 +196,66 @@ class QdrantIndex(EmbeddingIndex):
|
|||
reranker_type: str,
|
||||
reranker_params: dict[str, Any] | None = None,
|
||||
) -> QueryChunksResponse:
|
||||
raise NotImplementedError("Hybrid search is not supported in Qdrant")
|
||||
"""
|
||||
Hybrid search combining vector similarity and keyword filtering in a single query.
|
||||
|
||||
Uses Qdrant's native capability to combine a vector query with a query_filter,
|
||||
allowing vector similarity search to be filtered by keyword matches in one call.
|
||||
|
||||
Args:
|
||||
embedding: The query embedding vector
|
||||
query_string: The text query for keyword filtering
|
||||
k: Number of results to return
|
||||
score_threshold: Minimum similarity score threshold
|
||||
reranker_type: Not used with this approach, but kept for API compatibility
|
||||
reranker_params: Not used with this approach, but kept for API compatibility
|
||||
|
||||
Returns:
|
||||
QueryChunksResponse with filtered vector search results
|
||||
"""
|
||||
try:
|
||||
results = (
|
||||
await self.client.query_points(
|
||||
collection_name=self.collection_name,
|
||||
query=embedding.tolist(),
|
||||
query_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(
|
||||
key="chunk_content.content", match=models.MatchText(text=query_string)
|
||||
)
|
||||
]
|
||||
),
|
||||
limit=k,
|
||||
with_payload=True,
|
||||
score_threshold=score_threshold,
|
||||
)
|
||||
).points
|
||||
except Exception as e:
|
||||
log.error(f"Error querying hybrid search in Qdrant collection {self.collection_name}: {e}")
|
||||
raise
|
||||
|
||||
chunks, scores = [], []
|
||||
for point in results:
|
||||
if not isinstance(point, models.ScoredPoint):
|
||||
raise RuntimeError(f"Expected ScoredPoint from Qdrant query, got {type(point).__name__}")
|
||||
if point.payload is None:
|
||||
raise RuntimeError("Qdrant query returned point with no payload")
|
||||
|
||||
try:
|
||||
chunk = Chunk(**point.payload["chunk_content"])
|
||||
except Exception:
|
||||
chunk_id = point.payload.get(CHUNK_ID_KEY, "unknown") if point.payload else "unknown"
|
||||
point_id = getattr(point, "id", "unknown")
|
||||
log.exception(
|
||||
f"Failed to parse chunk in collection {self.collection_name}: "
|
||||
f"chunk_id={chunk_id}, point_id={point_id}"
|
||||
)
|
||||
continue
|
||||
|
||||
chunks.append(chunk)
|
||||
scores.append(point.score)
|
||||
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
async def delete(self):
|
||||
await self.client.delete_collection(collection_name=self.collection_name)
|
||||
|
|
|
|||
|
|
@ -57,16 +57,20 @@ def skip_if_provider_doesnt_support_openai_vector_stores_search(client_with_mode
|
|||
],
|
||||
"keyword": [
|
||||
"inline::milvus",
|
||||
"inline::qdrant",
|
||||
"inline::sqlite-vec",
|
||||
"remote::milvus",
|
||||
"remote::pgvector",
|
||||
"remote::qdrant",
|
||||
"remote::weaviate",
|
||||
],
|
||||
"hybrid": [
|
||||
"inline::milvus",
|
||||
"inline::qdrant",
|
||||
"inline::sqlite-vec",
|
||||
"remote::milvus",
|
||||
"remote::pgvector",
|
||||
"remote::qdrant",
|
||||
"remote::weaviate",
|
||||
],
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,17 +14,19 @@ from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreCo
|
|||
from llama_stack.core.storage.kvstore import register_kvstore_backends
|
||||
from llama_stack.providers.inline.vector_io.faiss.config import FaissVectorIOConfig
|
||||
from llama_stack.providers.inline.vector_io.faiss.faiss import FaissIndex, FaissVectorIOAdapter
|
||||
from llama_stack.providers.inline.vector_io.qdrant.config import QdrantVectorIOConfig
|
||||
from llama_stack.providers.inline.vector_io.sqlite_vec import SQLiteVectorIOConfig
|
||||
from llama_stack.providers.inline.vector_io.sqlite_vec.sqlite_vec import SQLiteVecIndex, SQLiteVecVectorIOAdapter
|
||||
from llama_stack.providers.remote.vector_io.pgvector.config import PGVectorVectorIOConfig
|
||||
from llama_stack.providers.remote.vector_io.pgvector.pgvector import PGVectorIndex, PGVectorVectorIOAdapter
|
||||
from llama_stack.providers.remote.vector_io.qdrant.qdrant import QdrantIndex, QdrantVectorIOAdapter
|
||||
from llama_stack_api import Chunk, ChunkMetadata, QueryChunksResponse, VectorStore
|
||||
|
||||
EMBEDDING_DIMENSION = 768
|
||||
COLLECTION_PREFIX = "test_collection"
|
||||
|
||||
|
||||
@pytest.fixture(params=["sqlite_vec", "faiss", "pgvector"])
|
||||
@pytest.fixture(params=["sqlite_vec", "faiss", "pgvector", "qdrant"])
|
||||
def vector_provider(request):
|
||||
return request.param
|
||||
|
||||
|
|
@ -317,12 +319,116 @@ async def pgvector_vec_adapter(unique_kvstore_config, mock_inference_api, embedd
|
|||
await adapter.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def qdrant_vec_index(embedding_dimension):
|
||||
from qdrant_client import models
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.collection_exists.return_value = False
|
||||
mock_client.create_collection = AsyncMock()
|
||||
mock_client.query_points = AsyncMock(return_value=AsyncMock(points=[]))
|
||||
mock_client.delete_collection = AsyncMock()
|
||||
|
||||
collection_name = f"test-qdrant-collection-{random.randint(1, 1000000)}"
|
||||
index = QdrantIndex(mock_client, collection_name)
|
||||
index._test_chunks = []
|
||||
|
||||
async def mock_add_chunks(chunks, embeddings):
|
||||
index._test_chunks = list(chunks)
|
||||
# Create mock query response with test chunks
|
||||
mock_points = []
|
||||
for chunk in chunks:
|
||||
mock_point = MagicMock(spec=models.ScoredPoint)
|
||||
mock_point.score = 1.0
|
||||
mock_point.payload = {"chunk_content": chunk.model_dump(), "_chunk_id": chunk.chunk_id}
|
||||
mock_points.append(mock_point)
|
||||
|
||||
async def query_points_mock(**kwargs):
|
||||
# Return chunks in order when queried
|
||||
query_k = kwargs.get("limit", len(index._test_chunks))
|
||||
return AsyncMock(points=mock_points[:query_k])
|
||||
|
||||
mock_client.query_points = query_points_mock
|
||||
|
||||
index.add_chunks = mock_add_chunks
|
||||
|
||||
async def mock_query_vector(embedding, k, score_threshold):
|
||||
chunks = index._test_chunks[:k] if hasattr(index, "_test_chunks") else []
|
||||
scores = [1.0] * len(chunks)
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
index.query_vector = mock_query_vector
|
||||
|
||||
yield index
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def qdrant_vec_adapter(unique_kvstore_config, mock_inference_api, embedding_dimension):
|
||||
config = QdrantVectorIOConfig(
|
||||
path=":memory:",
|
||||
persistence=unique_kvstore_config,
|
||||
)
|
||||
|
||||
adapter = QdrantVectorIOAdapter(config, mock_inference_api, None)
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.collection_exists.return_value = False
|
||||
mock_client.create_collection = AsyncMock()
|
||||
mock_client.query_points = AsyncMock(return_value=AsyncMock(points=[]))
|
||||
mock_client.delete_collection = AsyncMock()
|
||||
mock_client.close = AsyncMock()
|
||||
mock_client.upsert = AsyncMock()
|
||||
|
||||
with patch("llama_stack.providers.remote.vector_io.qdrant.qdrant.AsyncQdrantClient") as mock_client_class:
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
with patch("llama_stack.core.storage.kvstore.kvstore_impl") as mock_kvstore_impl:
|
||||
mock_kvstore = AsyncMock()
|
||||
mock_kvstore.values_in_range.return_value = []
|
||||
mock_kvstore_impl.return_value = mock_kvstore
|
||||
|
||||
with patch.object(adapter, "initialize_openai_vector_stores", new_callable=AsyncMock):
|
||||
await adapter.initialize()
|
||||
adapter.client = mock_client
|
||||
|
||||
async def mock_insert_chunks(vector_store_id, chunks, ttl_seconds=None):
|
||||
index = await adapter._get_and_cache_vector_store_index(vector_store_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_store_id} not found")
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
adapter.insert_chunks = mock_insert_chunks
|
||||
|
||||
async def mock_query_chunks(vector_store_id, query, params=None):
|
||||
index = await adapter._get_and_cache_vector_store_index(vector_store_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_store_id} not found")
|
||||
return await index.query_chunks(query, params)
|
||||
|
||||
adapter.query_chunks = mock_query_chunks
|
||||
|
||||
test_vector_store = VectorStore(
|
||||
identifier=f"qdrant_test_collection_{random.randint(1, 1_000_000)}",
|
||||
provider_id="test_provider",
|
||||
embedding_model="test_model",
|
||||
embedding_dimension=embedding_dimension,
|
||||
)
|
||||
await adapter.register_vector_store(test_vector_store)
|
||||
adapter.test_collection_id = test_vector_store.identifier
|
||||
|
||||
yield adapter
|
||||
await adapter.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def vector_io_adapter(vector_provider, request):
|
||||
vector_provider_dict = {
|
||||
"faiss": "faiss_vec_adapter",
|
||||
"sqlite_vec": "sqlite_vec_adapter",
|
||||
"pgvector": "pgvector_vec_adapter",
|
||||
"qdrant": "qdrant_vec_adapter",
|
||||
}
|
||||
return request.getfixturevalue(vector_provider_dict[vector_provider])
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue