mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-25 11:18:03 +00:00
Merge branch 'main' into opengauss-add
This commit is contained in:
commit
39e49ab97a
807 changed files with 79555 additions and 26772 deletions
|
|
@ -12,6 +12,6 @@ from .config import ChromaVectorIOConfig
|
|||
async def get_adapter_impl(config: ChromaVectorIOConfig, deps: dict[Api, ProviderSpec]):
|
||||
from .chroma import ChromaVectorIOAdapter
|
||||
|
||||
impl = ChromaVectorIOAdapter(config, deps[Api.inference])
|
||||
impl = ChromaVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files))
|
||||
await impl.initialize()
|
||||
return impl
|
||||
|
|
|
|||
|
|
@ -12,25 +12,19 @@ from urllib.parse import urlparse
|
|||
import chromadb
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from llama_stack.apis.files import Files
|
||||
from llama_stack.apis.inference import InterleavedContent
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import (
|
||||
Chunk,
|
||||
QueryChunksResponse,
|
||||
SearchRankingOptions,
|
||||
VectorIO,
|
||||
VectorStoreChunkingStrategy,
|
||||
VectorStoreDeleteResponse,
|
||||
VectorStoreFileContentsResponse,
|
||||
VectorStoreFileObject,
|
||||
VectorStoreFileStatus,
|
||||
VectorStoreListFilesResponse,
|
||||
VectorStoreListResponse,
|
||||
VectorStoreObject,
|
||||
VectorStoreSearchResponsePage,
|
||||
)
|
||||
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
|
||||
from llama_stack.providers.inline.vector_io.chroma import ChromaVectorIOConfig as InlineChromaVectorIOConfig
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||
from llama_stack.providers.utils.kvstore.api import KVStore
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
|
|
@ -42,6 +36,13 @@ log = logging.getLogger(__name__)
|
|||
|
||||
ChromaClientType = chromadb.api.AsyncClientAPI | chromadb.api.ClientAPI
|
||||
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:chroma:{VERSION}::"
|
||||
VECTOR_INDEX_PREFIX = f"vector_index:chroma:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:chroma:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:chroma:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:chroma:{VERSION}::"
|
||||
|
||||
|
||||
# this is a helper to allow us to use async and non-async chroma clients interchangeably
|
||||
async def maybe_await(result):
|
||||
|
|
@ -51,16 +52,20 @@ async def maybe_await(result):
|
|||
|
||||
|
||||
class ChromaIndex(EmbeddingIndex):
|
||||
def __init__(self, client: ChromaClientType, collection):
|
||||
def __init__(self, client: ChromaClientType, collection, kvstore: KVStore | None = None):
|
||||
self.client = client
|
||||
self.collection = collection
|
||||
self.kvstore = kvstore
|
||||
|
||||
async def initialize(self):
|
||||
pass
|
||||
|
||||
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
|
||||
assert len(chunks) == len(embeddings), (
|
||||
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
|
||||
)
|
||||
|
||||
ids = [f"{c.metadata['document_id']}:chunk-{i}" for i, c in enumerate(chunks)]
|
||||
ids = [f"{c.metadata.get('document_id', '')}:{c.chunk_id}" for c in chunks]
|
||||
await maybe_await(
|
||||
self.collection.add(
|
||||
documents=[chunk.model_dump_json() for chunk in chunks],
|
||||
|
|
@ -110,6 +115,9 @@ class ChromaIndex(EmbeddingIndex):
|
|||
) -> QueryChunksResponse:
|
||||
raise NotImplementedError("Keyword search is not supported in Chroma")
|
||||
|
||||
async def delete_chunk(self, chunk_id: str) -> None:
|
||||
raise NotImplementedError("delete_chunk is not supported in Chroma")
|
||||
|
||||
async def query_hybrid(
|
||||
self,
|
||||
embedding: NDArray,
|
||||
|
|
@ -122,24 +130,26 @@ class ChromaIndex(EmbeddingIndex):
|
|||
raise NotImplementedError("Hybrid search is not supported in Chroma")
|
||||
|
||||
|
||||
class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
||||
class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
|
||||
def __init__(
|
||||
self,
|
||||
config: RemoteChromaVectorIOConfig | InlineChromaVectorIOConfig,
|
||||
inference_api: Api.inference,
|
||||
files_api: Files | None,
|
||||
) -> None:
|
||||
log.info(f"Initializing ChromaVectorIOAdapter with url: {config}")
|
||||
self.config = config
|
||||
self.inference_api = inference_api
|
||||
|
||||
self.client = None
|
||||
self.cache = {}
|
||||
self.kvstore: KVStore | None = None
|
||||
self.vector_db_store = None
|
||||
|
||||
async def initialize(self) -> None:
|
||||
if isinstance(self.config, RemoteChromaVectorIOConfig):
|
||||
if not self.config.url:
|
||||
raise ValueError("URL is a required parameter for the remote Chroma provider's config")
|
||||
self.kvstore = await kvstore_impl(self.config.kvstore)
|
||||
self.vector_db_store = self.kvstore
|
||||
|
||||
if isinstance(self.config, RemoteChromaVectorIOConfig):
|
||||
log.info(f"Connecting to Chroma server at: {self.config.url}")
|
||||
url = self.config.url.rstrip("/")
|
||||
parsed = urlparse(url)
|
||||
|
|
@ -151,6 +161,7 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
else:
|
||||
log.info(f"Connecting to Chroma local db at: {self.config.db_path}")
|
||||
self.client = chromadb.PersistentClient(path=self.config.db_path)
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
pass
|
||||
|
|
@ -170,6 +181,10 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
)
|
||||
|
||||
async def unregister_vector_db(self, vector_db_id: str) -> None:
|
||||
if vector_db_id not in self.cache:
|
||||
log.warning(f"Vector DB {vector_db_id} not found")
|
||||
return
|
||||
|
||||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
|
||||
|
|
@ -180,6 +195,8 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
ttl_seconds: int | None = None,
|
||||
) -> None:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if index is None:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma")
|
||||
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
|
|
@ -191,6 +208,9 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
) -> QueryChunksResponse:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
|
||||
if index is None:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found in Chroma")
|
||||
|
||||
return await index.query_chunks(query, params)
|
||||
|
||||
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex:
|
||||
|
|
@ -207,107 +227,5 @@ class ChromaVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
self.cache[vector_db_id] = index
|
||||
return index
|
||||
|
||||
async def openai_create_vector_store(
|
||||
self,
|
||||
name: str,
|
||||
file_ids: list[str] | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
chunking_strategy: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
embedding_model: str | None = None,
|
||||
embedding_dimension: int | None = 384,
|
||||
provider_id: str | None = None,
|
||||
provider_vector_db_id: str | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_list_vector_stores(
|
||||
self,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
) -> VectorStoreListResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_retrieve_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_update_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
name: str | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_delete_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreDeleteResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_search_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
query: str | list[str],
|
||||
filters: dict[str, Any] | None = None,
|
||||
max_num_results: int | None = 10,
|
||||
ranking_options: SearchRankingOptions | None = None,
|
||||
rewrite_query: bool | None = False,
|
||||
search_mode: str | None = "vector",
|
||||
) -> VectorStoreSearchResponsePage:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_attach_file_to_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
attributes: dict[str, Any] | None = None,
|
||||
chunking_strategy: VectorStoreChunkingStrategy | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_list_files_in_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
filter: VectorStoreFileStatus | None = None,
|
||||
) -> VectorStoreListFilesResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_retrieve_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_retrieve_vector_store_file_contents(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileContentsResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_update_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
attributes: dict[str, Any] | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
||||
async def openai_delete_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma")
|
||||
|
|
|
|||
|
|
@ -6,12 +6,23 @@
|
|||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
@json_schema_type
|
||||
class ChromaVectorIOConfig(BaseModel):
|
||||
url: str | None
|
||||
kvstore: KVStoreConfig = Field(description="Config for KV store backend")
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, url: str = "${env.CHROMADB_URL}", **kwargs: Any) -> dict[str, Any]:
|
||||
return {"url": url}
|
||||
def sample_run_config(cls, __distro_dir__: str, url: str = "${env.CHROMADB_URL}", **kwargs: Any) -> dict[str, Any]:
|
||||
return {
|
||||
"url": url,
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="chroma_remote_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from typing import Any
|
|||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import KVStoreConfig
|
||||
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
|
|
@ -17,7 +17,7 @@ class MilvusVectorIOConfig(BaseModel):
|
|||
uri: str = Field(description="The URI of the Milvus server")
|
||||
token: str | None = Field(description="The token of the Milvus server")
|
||||
consistency_level: str = Field(description="The consistency level of the Milvus server", default="Strong")
|
||||
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
|
||||
kvstore: KVStoreConfig = Field(description="Config for KV store backend")
|
||||
|
||||
# This configuration allows additional fields to be passed through to the underlying Milvus client.
|
||||
# See the [Milvus](https://milvus.io/docs/install-overview.md) documentation for more details about Milvus in general.
|
||||
|
|
@ -25,4 +25,11 @@ class MilvusVectorIOConfig(BaseModel):
|
|||
|
||||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
|
||||
return {"uri": "${env.MILVUS_ENDPOINT}", "token": "${env.MILVUS_TOKEN}"}
|
||||
return {
|
||||
"uri": "${env.MILVUS_ENDPOINT}",
|
||||
"token": "${env.MILVUS_TOKEN}",
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="milvus_remote_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,15 +5,14 @@
|
|||
# the root directory of this source tree.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from numpy.typing import NDArray
|
||||
from pymilvus import DataType, MilvusClient
|
||||
from pymilvus import AnnSearchRequest, DataType, Function, FunctionType, MilvusClient, RRFRanker, WeightedRanker
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files.files import Files
|
||||
from llama_stack.apis.inference import Inference, InterleavedContent
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
|
|
@ -28,9 +27,11 @@ from llama_stack.providers.utils.kvstore import kvstore_impl
|
|||
from llama_stack.providers.utils.kvstore.api import KVStore
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
RERANKER_TYPE_WEIGHTED,
|
||||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
)
|
||||
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
|
||||
|
||||
from .config import MilvusVectorIOConfig as RemoteMilvusVectorIOConfig
|
||||
|
||||
|
|
@ -44,14 +45,6 @@ OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:milvus:{VERSION
|
|||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:milvus:{VERSION}::"
|
||||
|
||||
|
||||
def sanitize_collection_name(name: str) -> str:
|
||||
"""
|
||||
Sanitize collection name to ensure it only contains numbers, letters, and underscores.
|
||||
Any other characters are replaced with underscores.
|
||||
"""
|
||||
return re.sub(r"[^a-zA-Z0-9_]", "_", name)
|
||||
|
||||
|
||||
class MilvusIndex(EmbeddingIndex):
|
||||
def __init__(
|
||||
self, client: MilvusClient, collection_name: str, consistency_level="Strong", kvstore: KVStore | None = None
|
||||
|
|
@ -74,12 +67,66 @@ class MilvusIndex(EmbeddingIndex):
|
|||
assert len(chunks) == len(embeddings), (
|
||||
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
|
||||
)
|
||||
|
||||
if not await asyncio.to_thread(self.client.has_collection, self.collection_name):
|
||||
logger.info(f"Creating new collection {self.collection_name} with nullable sparse field")
|
||||
# Create schema for vector search
|
||||
schema = self.client.create_schema()
|
||||
schema.add_field(
|
||||
field_name="chunk_id",
|
||||
datatype=DataType.VARCHAR,
|
||||
is_primary=True,
|
||||
max_length=100,
|
||||
)
|
||||
schema.add_field(
|
||||
field_name="content",
|
||||
datatype=DataType.VARCHAR,
|
||||
max_length=65535,
|
||||
enable_analyzer=True, # Enable text analysis for BM25
|
||||
)
|
||||
schema.add_field(
|
||||
field_name="vector",
|
||||
datatype=DataType.FLOAT_VECTOR,
|
||||
dim=len(embeddings[0]),
|
||||
)
|
||||
schema.add_field(
|
||||
field_name="chunk_content",
|
||||
datatype=DataType.JSON,
|
||||
)
|
||||
# Add sparse vector field for BM25 (required by the function)
|
||||
schema.add_field(
|
||||
field_name="sparse",
|
||||
datatype=DataType.SPARSE_FLOAT_VECTOR,
|
||||
)
|
||||
|
||||
# Create indexes
|
||||
index_params = self.client.prepare_index_params()
|
||||
index_params.add_index(
|
||||
field_name="vector",
|
||||
index_type="FLAT",
|
||||
metric_type="COSINE",
|
||||
)
|
||||
# Add index for sparse field (required by BM25 function)
|
||||
index_params.add_index(
|
||||
field_name="sparse",
|
||||
index_type="SPARSE_INVERTED_INDEX",
|
||||
metric_type="BM25",
|
||||
)
|
||||
|
||||
# Add BM25 function for full-text search
|
||||
bm25_function = Function(
|
||||
name="text_bm25_emb",
|
||||
input_field_names=["content"],
|
||||
output_field_names=["sparse"],
|
||||
function_type=FunctionType.BM25,
|
||||
)
|
||||
schema.add_function(bm25_function)
|
||||
|
||||
await asyncio.to_thread(
|
||||
self.client.create_collection,
|
||||
self.collection_name,
|
||||
dimension=len(embeddings[0]),
|
||||
auto_id=True,
|
||||
schema=schema,
|
||||
index_params=index_params,
|
||||
consistency_level=self.consistency_level,
|
||||
)
|
||||
|
||||
|
|
@ -88,8 +135,10 @@ class MilvusIndex(EmbeddingIndex):
|
|||
data.append(
|
||||
{
|
||||
"chunk_id": chunk.chunk_id,
|
||||
"content": chunk.content,
|
||||
"vector": embedding,
|
||||
"chunk_content": chunk.model_dump(),
|
||||
# sparse field will be handled by BM25 function automatically
|
||||
}
|
||||
)
|
||||
try:
|
||||
|
|
@ -107,6 +156,7 @@ class MilvusIndex(EmbeddingIndex):
|
|||
self.client.search,
|
||||
collection_name=self.collection_name,
|
||||
data=[embedding],
|
||||
anns_field="vector",
|
||||
limit=k,
|
||||
output_fields=["*"],
|
||||
search_params={"params": {"radius": score_threshold}},
|
||||
|
|
@ -121,7 +171,64 @@ class MilvusIndex(EmbeddingIndex):
|
|||
k: int,
|
||||
score_threshold: float,
|
||||
) -> QueryChunksResponse:
|
||||
raise NotImplementedError("Keyword search is not supported in Milvus")
|
||||
"""
|
||||
Perform BM25-based keyword search using Milvus's built-in full-text search.
|
||||
"""
|
||||
try:
|
||||
# Use Milvus's built-in BM25 search
|
||||
search_res = await asyncio.to_thread(
|
||||
self.client.search,
|
||||
collection_name=self.collection_name,
|
||||
data=[query_string], # Raw text query
|
||||
anns_field="sparse", # Use sparse field for BM25
|
||||
output_fields=["chunk_content"], # Output the chunk content
|
||||
limit=k,
|
||||
search_params={
|
||||
"params": {
|
||||
"drop_ratio_search": 0.2, # Ignore low-importance terms
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
chunks = []
|
||||
scores = []
|
||||
for res in search_res[0]:
|
||||
chunk = Chunk(**res["entity"]["chunk_content"])
|
||||
chunks.append(chunk)
|
||||
scores.append(res["distance"]) # BM25 score from Milvus
|
||||
|
||||
# Filter by score threshold
|
||||
filtered_chunks = [chunk for chunk, score in zip(chunks, scores, strict=False) if score >= score_threshold]
|
||||
filtered_scores = [score for score in scores if score >= score_threshold]
|
||||
|
||||
return QueryChunksResponse(chunks=filtered_chunks, scores=filtered_scores)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error performing BM25 search: {e}")
|
||||
# Fallback to simple text search
|
||||
return await self._fallback_keyword_search(query_string, k, score_threshold)
|
||||
|
||||
async def _fallback_keyword_search(
|
||||
self,
|
||||
query_string: str,
|
||||
k: int,
|
||||
score_threshold: float,
|
||||
) -> QueryChunksResponse:
|
||||
"""
|
||||
Fallback to simple text search when BM25 search is not available.
|
||||
"""
|
||||
# Simple text search using content field
|
||||
search_res = await asyncio.to_thread(
|
||||
self.client.query,
|
||||
collection_name=self.collection_name,
|
||||
filter='content like "%{content}%"',
|
||||
filter_params={"content": query_string},
|
||||
output_fields=["*"],
|
||||
limit=k,
|
||||
)
|
||||
chunks = [Chunk(**res["chunk_content"]) for res in search_res]
|
||||
scores = [1.0] * len(chunks) # Simple binary score for text search
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
async def query_hybrid(
|
||||
self,
|
||||
|
|
@ -132,7 +239,63 @@ class MilvusIndex(EmbeddingIndex):
|
|||
reranker_type: str,
|
||||
reranker_params: dict[str, Any] | None = None,
|
||||
) -> QueryChunksResponse:
|
||||
raise NotImplementedError("Hybrid search is not supported in Milvus")
|
||||
"""
|
||||
Hybrid search using Milvus's native hybrid search capabilities.
|
||||
|
||||
This implementation uses Milvus's hybrid_search method which combines
|
||||
vector search and BM25 search with configurable reranking strategies.
|
||||
"""
|
||||
search_requests = []
|
||||
|
||||
# nprobe: Controls search accuracy vs performance trade-off
|
||||
# 10 balances these trade-offs for RAG applications
|
||||
search_requests.append(
|
||||
AnnSearchRequest(data=[embedding.tolist()], anns_field="vector", param={"nprobe": 10}, limit=k)
|
||||
)
|
||||
|
||||
# drop_ratio_search: Filters low-importance terms to improve search performance
|
||||
# 0.2 balances noise reduction with recall
|
||||
search_requests.append(
|
||||
AnnSearchRequest(data=[query_string], anns_field="sparse", param={"drop_ratio_search": 0.2}, limit=k)
|
||||
)
|
||||
|
||||
if reranker_type == RERANKER_TYPE_WEIGHTED:
|
||||
alpha = (reranker_params or {}).get("alpha", 0.5)
|
||||
rerank = WeightedRanker(alpha, 1 - alpha)
|
||||
else:
|
||||
impact_factor = (reranker_params or {}).get("impact_factor", 60.0)
|
||||
rerank = RRFRanker(impact_factor)
|
||||
|
||||
search_res = await asyncio.to_thread(
|
||||
self.client.hybrid_search,
|
||||
collection_name=self.collection_name,
|
||||
reqs=search_requests,
|
||||
ranker=rerank,
|
||||
limit=k,
|
||||
output_fields=["chunk_content"],
|
||||
)
|
||||
|
||||
chunks = []
|
||||
scores = []
|
||||
for res in search_res[0]:
|
||||
chunk = Chunk(**res["entity"]["chunk_content"])
|
||||
chunks.append(chunk)
|
||||
scores.append(res["distance"])
|
||||
|
||||
filtered_chunks = [chunk for chunk, score in zip(chunks, scores, strict=False) if score >= score_threshold]
|
||||
filtered_scores = [score for score in scores if score >= score_threshold]
|
||||
|
||||
return QueryChunksResponse(chunks=filtered_chunks, scores=filtered_scores)
|
||||
|
||||
async def delete_chunk(self, chunk_id: str) -> None:
|
||||
"""Remove a chunk from the Milvus collection."""
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
self.client.delete, collection_name=self.collection_name, filter=f'chunk_id == "{chunk_id}"'
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting chunk {chunk_id} from Milvus collection {self.collection_name}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
|
||||
|
|
@ -179,7 +342,8 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
|
|||
uri = os.path.expanduser(self.config.db_path)
|
||||
self.client = MilvusClient(uri=uri)
|
||||
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
# Load existing OpenAI vector stores into the in-memory cache
|
||||
await self.initialize_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
self.client.close()
|
||||
|
|
@ -205,11 +369,11 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
|
|||
return self.cache[vector_db_id]
|
||||
|
||||
if self.vector_db_store is None:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
|
||||
if not vector_db:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
index = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
|
|
@ -232,7 +396,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
|
|||
) -> None:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
|
|
@ -244,219 +408,24 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
|
|||
) -> QueryChunksResponse:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
if params and params.get("mode") == "keyword":
|
||||
# Check if this is inline Milvus (Milvus-Lite)
|
||||
if hasattr(self.config, "db_path"):
|
||||
raise NotImplementedError(
|
||||
"Keyword search is not supported in Milvus-Lite. "
|
||||
"Please use a remote Milvus server for keyword search functionality."
|
||||
)
|
||||
|
||||
return await index.query_chunks(query, params)
|
||||
|
||||
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Save vector store metadata to persistent storage."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||
self.openai_vector_stores[store_id] = store_info
|
||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
||||
"""Delete a chunk from a milvus vector store."""
|
||||
index = await self._get_and_cache_vector_db_index(store_id)
|
||||
if not index:
|
||||
raise VectorStoreNotFoundError(store_id)
|
||||
|
||||
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Update vector store metadata in persistent storage."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||
self.openai_vector_stores[store_id] = store_info
|
||||
|
||||
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
|
||||
"""Delete vector store metadata from persistent storage."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.delete(key)
|
||||
if store_id in self.openai_vector_stores:
|
||||
del self.openai_vector_stores[store_id]
|
||||
|
||||
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
|
||||
"""Load all vector store metadata from persistent storage."""
|
||||
assert self.kvstore is not None
|
||||
start_key = OPENAI_VECTOR_STORES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
|
||||
stored = await self.kvstore.values_in_range(start_key, end_key)
|
||||
return {json.loads(s)["id"]: json.loads(s) for s in stored}
|
||||
|
||||
async def _save_openai_vector_store_file(
|
||||
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Save vector store file metadata to Milvus database."""
|
||||
if store_id not in self.openai_vector_stores:
|
||||
store_info = await self._load_openai_vector_stores(store_id)
|
||||
if not store_info:
|
||||
logger.error(f"OpenAI vector store {store_id} not found")
|
||||
raise ValueError(f"No vector store found with id {store_id}")
|
||||
|
||||
try:
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
|
||||
file_schema = MilvusClient.create_schema(
|
||||
auto_id=False,
|
||||
enable_dynamic_field=True,
|
||||
description="Metadata for OpenAI vector store files",
|
||||
)
|
||||
file_schema.add_field(
|
||||
field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512
|
||||
)
|
||||
file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
|
||||
file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
|
||||
file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535)
|
||||
|
||||
await asyncio.to_thread(
|
||||
self.client.create_collection,
|
||||
collection_name="openai_vector_store_files",
|
||||
schema=file_schema,
|
||||
)
|
||||
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
|
||||
content_schema = MilvusClient.create_schema(
|
||||
auto_id=False,
|
||||
enable_dynamic_field=True,
|
||||
description="Contents for OpenAI vector store files",
|
||||
)
|
||||
content_schema.add_field(
|
||||
field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024
|
||||
)
|
||||
content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024)
|
||||
content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512)
|
||||
content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512)
|
||||
content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
|
||||
|
||||
await asyncio.to_thread(
|
||||
self.client.create_collection,
|
||||
collection_name="openai_vector_store_files_contents",
|
||||
schema=content_schema,
|
||||
)
|
||||
|
||||
file_data = [
|
||||
{
|
||||
"store_file_id": f"{store_id}_{file_id}",
|
||||
"store_id": store_id,
|
||||
"file_id": file_id,
|
||||
"file_info": json.dumps(file_info),
|
||||
}
|
||||
]
|
||||
await asyncio.to_thread(
|
||||
self.client.upsert,
|
||||
collection_name="openai_vector_store_files",
|
||||
data=file_data,
|
||||
)
|
||||
|
||||
# Save file contents
|
||||
contents_data = [
|
||||
{
|
||||
"chunk_id": content.get("chunk_metadata").get("chunk_id"),
|
||||
"store_file_id": f"{store_id}_{file_id}",
|
||||
"store_id": store_id,
|
||||
"file_id": file_id,
|
||||
"content": json.dumps(content),
|
||||
}
|
||||
for content in file_contents
|
||||
]
|
||||
await asyncio.to_thread(
|
||||
self.client.upsert,
|
||||
collection_name="openai_vector_store_files_contents",
|
||||
data=contents_data,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}")
|
||||
|
||||
async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]:
|
||||
"""Load vector store file metadata from Milvus database."""
|
||||
try:
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
|
||||
return {}
|
||||
|
||||
query_filter = f"store_file_id == '{store_id}_{file_id}'"
|
||||
results = await asyncio.to_thread(
|
||||
self.client.query,
|
||||
collection_name="openai_vector_store_files",
|
||||
filter=query_filter,
|
||||
output_fields=["file_info"],
|
||||
)
|
||||
|
||||
if results:
|
||||
try:
|
||||
return json.loads(results[0]["file_info"])
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}")
|
||||
return {}
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}")
|
||||
return {}
|
||||
|
||||
async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None:
|
||||
"""Update vector store file metadata in Milvus database."""
|
||||
try:
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
|
||||
return
|
||||
|
||||
file_data = [
|
||||
{
|
||||
"store_file_id": f"{store_id}_{file_id}",
|
||||
"store_id": store_id,
|
||||
"file_id": file_id,
|
||||
"file_info": json.dumps(file_info),
|
||||
}
|
||||
]
|
||||
await asyncio.to_thread(
|
||||
self.client.upsert,
|
||||
collection_name="openai_vector_store_files",
|
||||
data=file_data,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}")
|
||||
raise
|
||||
|
||||
async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]:
|
||||
"""Load vector store file contents from Milvus database."""
|
||||
try:
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
|
||||
return []
|
||||
|
||||
query_filter = (
|
||||
f"store_id == '{store_id}' AND file_id == '{file_id}' AND store_file_id == '{store_id}_{file_id}'"
|
||||
)
|
||||
results = await asyncio.to_thread(
|
||||
self.client.query,
|
||||
collection_name="openai_vector_store_files_contents",
|
||||
filter=query_filter,
|
||||
output_fields=["chunk_id", "store_id", "file_id", "content"],
|
||||
)
|
||||
|
||||
contents = []
|
||||
for result in results:
|
||||
try:
|
||||
content = json.loads(result["content"])
|
||||
contents.append(content)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}")
|
||||
return contents
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}")
|
||||
return []
|
||||
|
||||
async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None:
|
||||
"""Delete vector store file metadata from Milvus database."""
|
||||
try:
|
||||
if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"):
|
||||
return
|
||||
|
||||
query_filter = f"store_file_id in ['{store_id}_{file_id}']"
|
||||
await asyncio.to_thread(
|
||||
self.client.delete,
|
||||
collection_name="openai_vector_store_files",
|
||||
filter=query_filter,
|
||||
)
|
||||
if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"):
|
||||
await asyncio.to_thread(
|
||||
self.client.delete,
|
||||
collection_name="openai_vector_store_files_contents",
|
||||
filter=query_filter,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}")
|
||||
raise
|
||||
for chunk_id in chunk_ids:
|
||||
# Use the index's delete_chunk method
|
||||
await index.index.delete_chunk(chunk_id)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,6 @@ from .config import PGVectorVectorIOConfig
|
|||
async def get_adapter_impl(config: PGVectorVectorIOConfig, deps: dict[Api, ProviderSpec]):
|
||||
from .pgvector import PGVectorVectorIOAdapter
|
||||
|
||||
impl = PGVectorVectorIOAdapter(config, deps[Api.inference])
|
||||
impl = PGVectorVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files, None))
|
||||
await impl.initialize()
|
||||
return impl
|
||||
|
|
|
|||
|
|
@ -8,6 +8,10 @@ from typing import Any
|
|||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import (
|
||||
KVStoreConfig,
|
||||
SqliteKVStoreConfig,
|
||||
)
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
|
|
@ -18,10 +22,12 @@ class PGVectorVectorIOConfig(BaseModel):
|
|||
db: str | None = Field(default="postgres")
|
||||
user: str | None = Field(default="postgres")
|
||||
password: str | None = Field(default="mysecretpassword")
|
||||
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(
|
||||
cls,
|
||||
__distro_dir__: str,
|
||||
host: str = "${env.PGVECTOR_HOST:=localhost}",
|
||||
port: int = "${env.PGVECTOR_PORT:=5432}",
|
||||
db: str = "${env.PGVECTOR_DB}",
|
||||
|
|
@ -29,4 +35,14 @@ class PGVectorVectorIOConfig(BaseModel):
|
|||
password: str = "${env.PGVECTOR_PASSWORD}",
|
||||
**kwargs: Any,
|
||||
) -> dict[str, Any]:
|
||||
return {"host": host, "port": port, "db": db, "user": user, "password": password}
|
||||
return {
|
||||
"host": host,
|
||||
"port": port,
|
||||
"db": db,
|
||||
"user": user,
|
||||
"password": password,
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="pgvector_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,24 +13,19 @@ from psycopg2 import sql
|
|||
from psycopg2.extras import Json, execute_values
|
||||
from pydantic import BaseModel, TypeAdapter
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files.files import Files
|
||||
from llama_stack.apis.inference import InterleavedContent
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import (
|
||||
Chunk,
|
||||
QueryChunksResponse,
|
||||
SearchRankingOptions,
|
||||
VectorIO,
|
||||
VectorStoreChunkingStrategy,
|
||||
VectorStoreDeleteResponse,
|
||||
VectorStoreFileContentsResponse,
|
||||
VectorStoreFileObject,
|
||||
VectorStoreFileStatus,
|
||||
VectorStoreListFilesResponse,
|
||||
VectorStoreListResponse,
|
||||
VectorStoreObject,
|
||||
VectorStoreSearchResponsePage,
|
||||
)
|
||||
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||
from llama_stack.providers.utils.kvstore.api import KVStore
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
|
|
@ -40,6 +35,13 @@ from .config import PGVectorVectorIOConfig
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:pgvector:{VERSION}::"
|
||||
VECTOR_INDEX_PREFIX = f"vector_index:pgvector:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:pgvector:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:pgvector:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:pgvector:{VERSION}::"
|
||||
|
||||
|
||||
def check_extension_version(cur):
|
||||
cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'vector'")
|
||||
|
|
@ -69,7 +71,7 @@ def load_models(cur, cls):
|
|||
|
||||
|
||||
class PGVectorIndex(EmbeddingIndex):
|
||||
def __init__(self, vector_db: VectorDB, dimension: int, conn):
|
||||
def __init__(self, vector_db: VectorDB, dimension: int, conn, kvstore: KVStore | None = None):
|
||||
self.conn = conn
|
||||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
# Sanitize the table name by replacing hyphens with underscores
|
||||
|
|
@ -77,6 +79,7 @@ class PGVectorIndex(EmbeddingIndex):
|
|||
# when created with patterns like "test-vector-db-{uuid4()}"
|
||||
sanitized_identifier = vector_db.identifier.replace("-", "_")
|
||||
self.table_name = f"vector_store_{sanitized_identifier}"
|
||||
self.kvstore = kvstore
|
||||
|
||||
cur.execute(
|
||||
f"""
|
||||
|
|
@ -97,7 +100,7 @@ class PGVectorIndex(EmbeddingIndex):
|
|||
for i, chunk in enumerate(chunks):
|
||||
values.append(
|
||||
(
|
||||
f"{chunk.metadata['document_id']}:chunk-{i}",
|
||||
f"{chunk.chunk_id}",
|
||||
Json(chunk.model_dump()),
|
||||
embeddings[i].tolist(),
|
||||
)
|
||||
|
|
@ -129,8 +132,11 @@ class PGVectorIndex(EmbeddingIndex):
|
|||
chunks = []
|
||||
scores = []
|
||||
for doc, dist in results:
|
||||
score = 1.0 / float(dist) if dist != 0 else float("inf")
|
||||
if score < score_threshold:
|
||||
continue
|
||||
chunks.append(Chunk(**doc))
|
||||
scores.append(1.0 / float(dist) if dist != 0 else float("inf"))
|
||||
scores.append(score)
|
||||
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
|
|
@ -157,16 +163,34 @@ class PGVectorIndex(EmbeddingIndex):
|
|||
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
cur.execute(f"DROP TABLE IF EXISTS {self.table_name}")
|
||||
|
||||
async def delete_chunk(self, chunk_id: str) -> None:
|
||||
"""Remove a chunk from the PostgreSQL table."""
|
||||
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||||
cur.execute(f"DELETE FROM {self.table_name} WHERE id = %s", (chunk_id,))
|
||||
|
||||
class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
||||
def __init__(self, config: PGVectorVectorIOConfig, inference_api: Api.inference) -> None:
|
||||
|
||||
class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
|
||||
def __init__(
|
||||
self,
|
||||
config: PGVectorVectorIOConfig,
|
||||
inference_api: Api.inference,
|
||||
files_api: Files | None = None,
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.inference_api = inference_api
|
||||
self.conn = None
|
||||
self.cache = {}
|
||||
self.files_api = files_api
|
||||
self.kvstore: KVStore | None = None
|
||||
self.vector_db_store = None
|
||||
self.openai_vector_store: dict[str, dict[str, Any]] = {}
|
||||
self.metadatadata_collection_name = "openai_vector_stores_metadata"
|
||||
|
||||
async def initialize(self) -> None:
|
||||
log.info(f"Initializing PGVector memory adapter with config: {self.config}")
|
||||
self.kvstore = await kvstore_impl(self.config.kvstore)
|
||||
await self.initialize_openai_vector_stores()
|
||||
|
||||
try:
|
||||
self.conn = psycopg2.connect(
|
||||
host=self.config.host,
|
||||
|
|
@ -201,14 +225,28 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
log.info("Connection to PGVector database server closed")
|
||||
|
||||
async def register_vector_db(self, vector_db: VectorDB) -> None:
|
||||
# Persist vector DB metadata in the KV store
|
||||
assert self.kvstore is not None
|
||||
# Upsert model metadata in Postgres
|
||||
upsert_models(self.conn, [(vector_db.identifier, vector_db)])
|
||||
|
||||
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
# Create and cache the PGVector index table for the vector DB
|
||||
index = VectorDBWithIndex(
|
||||
vector_db,
|
||||
index=PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn, kvstore=self.kvstore),
|
||||
inference_api=self.inference_api,
|
||||
)
|
||||
self.cache[vector_db.identifier] = index
|
||||
|
||||
async def unregister_vector_db(self, vector_db_id: str) -> None:
|
||||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
# Remove provider index and cache
|
||||
if vector_db_id in self.cache:
|
||||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
|
||||
# Delete vector DB metadata from KV store
|
||||
assert self.kvstore is not None
|
||||
await self.kvstore.delete(key=f"{VECTOR_DBS_PREFIX}{vector_db_id}")
|
||||
|
||||
async def insert_chunks(
|
||||
self,
|
||||
|
|
@ -237,107 +275,12 @@ class PGVectorVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
return self.cache[vector_db_id]
|
||||
|
||||
async def openai_create_vector_store(
|
||||
self,
|
||||
name: str,
|
||||
file_ids: list[str] | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
chunking_strategy: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
embedding_model: str | None = None,
|
||||
embedding_dimension: int | None = 384,
|
||||
provider_id: str | None = None,
|
||||
provider_vector_db_id: str | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
||||
"""Delete a chunk from a PostgreSQL vector store."""
|
||||
index = await self._get_and_cache_vector_db_index(store_id)
|
||||
if not index:
|
||||
raise VectorStoreNotFoundError(store_id)
|
||||
|
||||
async def openai_list_vector_stores(
|
||||
self,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
) -> VectorStoreListResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_retrieve_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_update_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
name: str | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_delete_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreDeleteResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_search_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
query: str | list[str],
|
||||
filters: dict[str, Any] | None = None,
|
||||
max_num_results: int | None = 10,
|
||||
ranking_options: SearchRankingOptions | None = None,
|
||||
rewrite_query: bool | None = False,
|
||||
search_mode: str | None = "vector",
|
||||
) -> VectorStoreSearchResponsePage:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_attach_file_to_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
attributes: dict[str, Any] | None = None,
|
||||
chunking_strategy: VectorStoreChunkingStrategy | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_list_files_in_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
filter: VectorStoreFileStatus | None = None,
|
||||
) -> VectorStoreListFilesResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_retrieve_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_retrieve_vector_store_file_contents(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileContentsResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_update_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
attributes: dict[str, Any] | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
|
||||
async def openai_delete_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector")
|
||||
for chunk_id in chunk_ids:
|
||||
# Use the index's delete_chunk method
|
||||
await index.index.delete_chunk(chunk_id)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from .config import QdrantVectorIOConfig
|
|||
async def get_adapter_impl(config: QdrantVectorIOConfig, deps: dict[Api, ProviderSpec]):
|
||||
from .qdrant import QdrantVectorIOAdapter
|
||||
|
||||
impl = QdrantVectorIOAdapter(config, deps[Api.inference])
|
||||
files_api = deps.get(Api.files)
|
||||
impl = QdrantVectorIOAdapter(config, deps[Api.inference], files_api)
|
||||
await impl.initialize()
|
||||
return impl
|
||||
|
|
|
|||
|
|
@ -8,6 +8,10 @@ from typing import Any
|
|||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import (
|
||||
KVStoreConfig,
|
||||
SqliteKVStoreConfig,
|
||||
)
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
|
|
@ -23,9 +27,14 @@ class QdrantVectorIOConfig(BaseModel):
|
|||
prefix: str | None = None
|
||||
timeout: int | None = None
|
||||
host: str | None = None
|
||||
kvstore: KVStoreConfig
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]:
|
||||
def sample_run_config(cls, __distro_dir__: str, **kwargs: Any) -> dict[str, Any]:
|
||||
return {
|
||||
"api_key": "${env.QDRANT_API_KEY}",
|
||||
"api_key": "${env.QDRANT_API_KEY:=}",
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="qdrant_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
|
@ -12,25 +13,21 @@ from numpy.typing import NDArray
|
|||
from qdrant_client import AsyncQdrantClient, models
|
||||
from qdrant_client.models import PointStruct
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files import Files
|
||||
from llama_stack.apis.inference import InterleavedContent
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import (
|
||||
Chunk,
|
||||
QueryChunksResponse,
|
||||
SearchRankingOptions,
|
||||
VectorIO,
|
||||
VectorStoreChunkingStrategy,
|
||||
VectorStoreDeleteResponse,
|
||||
VectorStoreFileContentsResponse,
|
||||
VectorStoreFileObject,
|
||||
VectorStoreFileStatus,
|
||||
VectorStoreListFilesResponse,
|
||||
VectorStoreListResponse,
|
||||
VectorStoreObject,
|
||||
VectorStoreSearchResponsePage,
|
||||
)
|
||||
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
|
||||
from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig as InlineQdrantVectorIOConfig
|
||||
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
|
|
@ -41,6 +38,10 @@ from .config import QdrantVectorIOConfig as RemoteQdrantVectorIOConfig
|
|||
log = logging.getLogger(__name__)
|
||||
CHUNK_ID_KEY = "_chunk_id"
|
||||
|
||||
# KV store prefixes for vector databases
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:qdrant:{VERSION}::"
|
||||
|
||||
|
||||
def convert_id(_id: str) -> str:
|
||||
"""
|
||||
|
|
@ -58,6 +59,11 @@ class QdrantIndex(EmbeddingIndex):
|
|||
self.client = client
|
||||
self.collection_name = collection_name
|
||||
|
||||
async def initialize(self) -> None:
|
||||
# Qdrant collections are created on-demand in add_chunks
|
||||
# If the collection does not exist, it will be created in add_chunks.
|
||||
pass
|
||||
|
||||
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
|
||||
assert len(chunks) == len(embeddings), (
|
||||
f"Chunk length {len(chunks)} does not match embedding length {len(embeddings)}"
|
||||
|
|
@ -82,6 +88,17 @@ class QdrantIndex(EmbeddingIndex):
|
|||
|
||||
await self.client.upsert(collection_name=self.collection_name, points=points)
|
||||
|
||||
async def delete_chunk(self, chunk_id: str) -> None:
|
||||
"""Remove a chunk from the Qdrant collection."""
|
||||
try:
|
||||
await self.client.delete(
|
||||
collection_name=self.collection_name,
|
||||
points_selector=models.PointIdsList(points=[convert_id(chunk_id)]),
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(f"Error deleting chunk {chunk_id} from Qdrant collection {self.collection_name}: {e}")
|
||||
raise
|
||||
|
||||
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
|
||||
results = (
|
||||
await self.client.query_points(
|
||||
|
|
@ -132,17 +149,41 @@ class QdrantIndex(EmbeddingIndex):
|
|||
await self.client.delete_collection(collection_name=self.collection_name)
|
||||
|
||||
|
||||
class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
||||
class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
|
||||
def __init__(
|
||||
self, config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig, inference_api: Api.inference
|
||||
self,
|
||||
config: RemoteQdrantVectorIOConfig | InlineQdrantVectorIOConfig,
|
||||
inference_api: Api.inference,
|
||||
files_api: Files | None = None,
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.client: AsyncQdrantClient = None
|
||||
self.cache = {}
|
||||
self.inference_api = inference_api
|
||||
self.files_api = files_api
|
||||
self.vector_db_store = None
|
||||
self.kvstore: KVStore | None = None
|
||||
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
|
||||
self._qdrant_lock = asyncio.Lock()
|
||||
|
||||
async def initialize(self) -> None:
|
||||
self.client = AsyncQdrantClient(**self.config.model_dump(exclude_none=True))
|
||||
client_config = self.config.model_dump(exclude_none=True, exclude={"kvstore"})
|
||||
self.client = AsyncQdrantClient(**client_config)
|
||||
self.kvstore = await kvstore_impl(self.config.kvstore)
|
||||
|
||||
start_key = VECTOR_DBS_PREFIX
|
||||
end_key = f"{VECTOR_DBS_PREFIX}\xff"
|
||||
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
|
||||
|
||||
for vector_db_data in stored_vector_dbs:
|
||||
vector_db = VectorDB.model_validate_json(vector_db_data)
|
||||
index = VectorDBWithIndex(
|
||||
vector_db,
|
||||
QdrantIndex(self.client, vector_db.identifier),
|
||||
self.inference_api,
|
||||
)
|
||||
self.cache[vector_db.identifier] = index
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
await self.client.close()
|
||||
|
|
@ -151,6 +192,10 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
self,
|
||||
vector_db: VectorDB,
|
||||
) -> None:
|
||||
assert self.kvstore is not None
|
||||
key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}"
|
||||
await self.kvstore.set(key=key, value=vector_db.model_dump_json())
|
||||
|
||||
index = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
index=QdrantIndex(self.client, vector_db.identifier),
|
||||
|
|
@ -164,13 +209,19 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
|
||||
assert self.kvstore is not None
|
||||
await self.kvstore.delete(f"{VECTOR_DBS_PREFIX}{vector_db_id}")
|
||||
|
||||
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
|
||||
if vector_db_id in self.cache:
|
||||
return self.cache[vector_db_id]
|
||||
|
||||
if self.vector_db_store is None:
|
||||
raise ValueError(f"Vector DB not found {vector_db_id}")
|
||||
|
||||
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
|
||||
if not vector_db:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
index = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
|
|
@ -188,7 +239,7 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
) -> None:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
|
|
@ -200,66 +251,10 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
) -> QueryChunksResponse:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
return await index.query_chunks(query, params)
|
||||
|
||||
async def openai_create_vector_store(
|
||||
self,
|
||||
name: str,
|
||||
file_ids: list[str] | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
chunking_strategy: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
embedding_model: str | None = None,
|
||||
embedding_dimension: int | None = 384,
|
||||
provider_id: str | None = None,
|
||||
provider_vector_db_id: str | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_list_vector_stores(
|
||||
self,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
) -> VectorStoreListResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_retrieve_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_update_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
name: str | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> VectorStoreObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_delete_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreDeleteResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_search_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
query: str | list[str],
|
||||
filters: dict[str, Any] | None = None,
|
||||
max_num_results: int | None = 10,
|
||||
ranking_options: SearchRankingOptions | None = None,
|
||||
rewrite_query: bool | None = False,
|
||||
search_mode: str | None = "vector",
|
||||
) -> VectorStoreSearchResponsePage:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_attach_file_to_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
|
|
@ -267,44 +262,14 @@ class QdrantVectorIOAdapter(VectorIO, VectorDBsProtocolPrivate):
|
|||
attributes: dict[str, Any] | None = None,
|
||||
chunking_strategy: VectorStoreChunkingStrategy | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
# Qdrant doesn't allow multiple clients to access the same storage path simultaneously.
|
||||
async with self._qdrant_lock:
|
||||
await super().openai_attach_file_to_vector_store(vector_store_id, file_id, attributes, chunking_strategy)
|
||||
|
||||
async def openai_list_files_in_vector_store(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
limit: int | None = 20,
|
||||
order: str | None = "desc",
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
filter: VectorStoreFileStatus | None = None,
|
||||
) -> VectorStoreListFilesResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_retrieve_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_retrieve_vector_store_file_contents(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileContentsResponse:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_update_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
attributes: dict[str, Any] | None = None,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
|
||||
async def openai_delete_vector_store_file(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_id: str,
|
||||
) -> VectorStoreFileObject:
|
||||
raise NotImplementedError("OpenAI Vector Stores API is not supported in Qdrant")
|
||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
||||
"""Delete chunks from a Qdrant vector store."""
|
||||
index = await self._get_and_cache_vector_db_index(store_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {store_id} not found")
|
||||
for chunk_id in chunk_ids:
|
||||
await index.index.delete_chunk(chunk_id)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,6 @@ from .config import WeaviateVectorIOConfig
|
|||
async def get_adapter_impl(config: WeaviateVectorIOConfig, deps: dict[Api, ProviderSpec]):
|
||||
from .weaviate import WeaviateVectorIOAdapter
|
||||
|
||||
impl = WeaviateVectorIOAdapter(config, deps[Api.inference])
|
||||
impl = WeaviateVectorIOAdapter(config, deps[Api.inference], deps.get(Api.files, None))
|
||||
await impl.initialize()
|
||||
return impl
|
||||
|
|
|
|||
|
|
@ -6,15 +6,32 @@
|
|||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class WeaviateRequestProviderData(BaseModel):
|
||||
weaviate_api_key: str
|
||||
weaviate_cluster_url: str
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import (
|
||||
KVStoreConfig,
|
||||
SqliteKVStoreConfig,
|
||||
)
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
@json_schema_type
|
||||
class WeaviateVectorIOConfig(BaseModel):
|
||||
weaviate_api_key: str | None = Field(description="The API key for the Weaviate instance", default=None)
|
||||
weaviate_cluster_url: str | None = Field(description="The URL of the Weaviate cluster", default="localhost:8080")
|
||||
kvstore: KVStoreConfig | None = Field(description="Config for KV store backend (SQLite only for now)", default=None)
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, **kwargs: Any) -> dict[str, Any]:
|
||||
return {}
|
||||
def sample_run_config(
|
||||
cls,
|
||||
__distro_dir__: str,
|
||||
**kwargs: Any,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"weaviate_api_key": None,
|
||||
"weaviate_cluster_url": "${env.WEAVIATE_CLUSTER_URL:=localhost:8080}",
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="weaviate_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,24 +14,48 @@ from weaviate.classes.init import Auth
|
|||
from weaviate.classes.query import Filter
|
||||
|
||||
from llama_stack.apis.common.content_types import InterleavedContent
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files.files import Files
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse, VectorIO
|
||||
from llama_stack.distribution.request_headers import NeedsRequestProviderData
|
||||
from llama_stack.core.request_headers import NeedsRequestProviderData
|
||||
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||
from llama_stack.providers.utils.kvstore.api import KVStore
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import (
|
||||
OpenAIVectorStoreMixin,
|
||||
)
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
EmbeddingIndex,
|
||||
VectorDBWithIndex,
|
||||
)
|
||||
from llama_stack.providers.utils.vector_io.vector_utils import sanitize_collection_name
|
||||
|
||||
from .config import WeaviateRequestProviderData, WeaviateVectorIOConfig
|
||||
from .config import WeaviateVectorIOConfig
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:weaviate:{VERSION}::"
|
||||
VECTOR_INDEX_PREFIX = f"vector_index:weaviate:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:weaviate:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:weaviate:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:weaviate:{VERSION}::"
|
||||
|
||||
|
||||
class WeaviateIndex(EmbeddingIndex):
|
||||
def __init__(self, client: weaviate.Client, collection_name: str):
|
||||
def __init__(
|
||||
self,
|
||||
client: weaviate.Client,
|
||||
collection_name: str,
|
||||
kvstore: KVStore | None = None,
|
||||
):
|
||||
self.client = client
|
||||
self.collection_name = collection_name
|
||||
self.collection_name = sanitize_collection_name(collection_name, weaviate_format=True)
|
||||
self.kvstore = kvstore
|
||||
|
||||
async def initialize(self):
|
||||
pass
|
||||
|
||||
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
|
||||
assert len(chunks) == len(embeddings), (
|
||||
|
|
@ -55,8 +79,14 @@ class WeaviateIndex(EmbeddingIndex):
|
|||
# TODO: make this async friendly
|
||||
collection.data.insert_many(data_objects)
|
||||
|
||||
async def delete_chunk(self, chunk_id: str) -> None:
|
||||
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
||||
collection = self.client.collections.get(sanitized_collection_name)
|
||||
collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id]))
|
||||
|
||||
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
|
||||
collection = self.client.collections.get(self.collection_name)
|
||||
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
||||
collection = self.client.collections.get(sanitized_collection_name)
|
||||
|
||||
results = collection.query.near_vector(
|
||||
near_vector=embedding.tolist(),
|
||||
|
|
@ -75,13 +105,26 @@ class WeaviateIndex(EmbeddingIndex):
|
|||
log.exception(f"Failed to parse document: {chunk_json}")
|
||||
continue
|
||||
|
||||
score = 1.0 / doc.metadata.distance if doc.metadata.distance != 0 else float("inf")
|
||||
if score < score_threshold:
|
||||
continue
|
||||
|
||||
chunks.append(chunk)
|
||||
scores.append(1.0 / doc.metadata.distance if doc.metadata.distance != 0 else float("inf"))
|
||||
scores.append(score)
|
||||
|
||||
return QueryChunksResponse(chunks=chunks, scores=scores)
|
||||
|
||||
async def delete(self, chunk_ids: list[str]) -> None:
|
||||
collection = self.client.collections.get(self.collection_name)
|
||||
async def delete(self, chunk_ids: list[str] | None = None) -> None:
|
||||
"""
|
||||
Delete chunks by IDs if provided, otherwise drop the entire collection.
|
||||
"""
|
||||
sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True)
|
||||
if chunk_ids is None:
|
||||
# Drop entire collection if it exists
|
||||
if self.client.collections.exists(sanitized_collection_name):
|
||||
self.client.collections.delete(sanitized_collection_name)
|
||||
return
|
||||
collection = self.client.collections.get(sanitized_collection_name)
|
||||
collection.data.delete_many(where=Filter.by_property("id").contains_any(chunk_ids))
|
||||
|
||||
async def query_keyword(
|
||||
|
|
@ -105,34 +148,78 @@ class WeaviateIndex(EmbeddingIndex):
|
|||
|
||||
|
||||
class WeaviateVectorIOAdapter(
|
||||
OpenAIVectorStoreMixin,
|
||||
VectorIO,
|
||||
NeedsRequestProviderData,
|
||||
VectorDBsProtocolPrivate,
|
||||
):
|
||||
def __init__(self, config: WeaviateVectorIOConfig, inference_api: Api.inference) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
config: WeaviateVectorIOConfig,
|
||||
inference_api: Api.inference,
|
||||
files_api: Files | None,
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.inference_api = inference_api
|
||||
self.client_cache = {}
|
||||
self.cache = {}
|
||||
self.files_api = files_api
|
||||
self.kvstore: KVStore | None = None
|
||||
self.vector_db_store = None
|
||||
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
|
||||
self.metadata_collection_name = "openai_vector_stores_metadata"
|
||||
|
||||
def _get_client(self) -> weaviate.Client:
|
||||
provider_data = self.get_request_provider_data()
|
||||
assert provider_data is not None, "Request provider data must be set"
|
||||
assert isinstance(provider_data, WeaviateRequestProviderData)
|
||||
|
||||
key = f"{provider_data.weaviate_cluster_url}::{provider_data.weaviate_api_key}"
|
||||
if key in self.client_cache:
|
||||
return self.client_cache[key]
|
||||
|
||||
client = weaviate.connect_to_weaviate_cloud(
|
||||
cluster_url=provider_data.weaviate_cluster_url,
|
||||
auth_credentials=Auth.api_key(provider_data.weaviate_api_key),
|
||||
)
|
||||
if "localhost" in self.config.weaviate_cluster_url:
|
||||
log.info("using Weaviate locally in container")
|
||||
host, port = self.config.weaviate_cluster_url.split(":")
|
||||
key = "local_test"
|
||||
client = weaviate.connect_to_local(
|
||||
host=host,
|
||||
port=port,
|
||||
)
|
||||
else:
|
||||
log.info("Using Weaviate remote cluster with URL")
|
||||
key = f"{self.config.weaviate_cluster_url}::{self.config.weaviate_api_key}"
|
||||
if key in self.client_cache:
|
||||
return self.client_cache[key]
|
||||
client = weaviate.connect_to_weaviate_cloud(
|
||||
cluster_url=self.config.weaviate_cluster_url,
|
||||
auth_credentials=Auth.api_key(self.config.weaviate_api_key),
|
||||
)
|
||||
self.client_cache[key] = client
|
||||
return client
|
||||
|
||||
async def initialize(self) -> None:
|
||||
pass
|
||||
"""Set up KV store and load existing vector DBs and OpenAI vector stores."""
|
||||
# Initialize KV store for metadata if configured
|
||||
if self.config.kvstore is not None:
|
||||
self.kvstore = await kvstore_impl(self.config.kvstore)
|
||||
else:
|
||||
self.kvstore = None
|
||||
log.info("No kvstore configured, registry will not persist across restarts")
|
||||
|
||||
# Load existing vector DB definitions
|
||||
if self.kvstore is not None:
|
||||
start_key = VECTOR_DBS_PREFIX
|
||||
end_key = f"{VECTOR_DBS_PREFIX}\xff"
|
||||
stored = await self.kvstore.values_in_range(start_key, end_key)
|
||||
for raw in stored:
|
||||
vector_db = VectorDB.model_validate_json(raw)
|
||||
client = self._get_client()
|
||||
idx = WeaviateIndex(
|
||||
client=client,
|
||||
collection_name=vector_db.identifier,
|
||||
kvstore=self.kvstore,
|
||||
)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
index=idx,
|
||||
inference_api=self.inference_api,
|
||||
)
|
||||
|
||||
# Load OpenAI vector stores metadata into cache
|
||||
await self.initialize_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
for client in self.client_cache.values():
|
||||
|
|
@ -143,11 +230,11 @@ class WeaviateVectorIOAdapter(
|
|||
vector_db: VectorDB,
|
||||
) -> None:
|
||||
client = self._get_client()
|
||||
|
||||
sanitized_collection_name = sanitize_collection_name(vector_db.identifier, weaviate_format=True)
|
||||
# Create collection if it doesn't exist
|
||||
if not client.collections.exists(vector_db.identifier):
|
||||
if not client.collections.exists(sanitized_collection_name):
|
||||
client.collections.create(
|
||||
name=vector_db.identifier,
|
||||
name=sanitized_collection_name,
|
||||
vectorizer_config=wvc.config.Configure.Vectorizer.none(),
|
||||
properties=[
|
||||
wvc.config.Property(
|
||||
|
|
@ -157,30 +244,41 @@ class WeaviateVectorIOAdapter(
|
|||
],
|
||||
)
|
||||
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(
|
||||
self.cache[sanitized_collection_name] = VectorDBWithIndex(
|
||||
vector_db,
|
||||
WeaviateIndex(client=client, collection_name=vector_db.identifier),
|
||||
WeaviateIndex(client=client, collection_name=sanitized_collection_name),
|
||||
self.inference_api,
|
||||
)
|
||||
|
||||
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
|
||||
if vector_db_id in self.cache:
|
||||
return self.cache[vector_db_id]
|
||||
async def unregister_vector_db(self, vector_db_id: str) -> None:
|
||||
client = self._get_client()
|
||||
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
|
||||
if sanitized_collection_name not in self.cache or client.collections.exists(sanitized_collection_name) is False:
|
||||
log.warning(f"Vector DB {sanitized_collection_name} not found")
|
||||
return
|
||||
client.collections.delete(sanitized_collection_name)
|
||||
await self.cache[sanitized_collection_name].index.delete()
|
||||
del self.cache[sanitized_collection_name]
|
||||
|
||||
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
|
||||
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
|
||||
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
|
||||
if sanitized_collection_name in self.cache:
|
||||
return self.cache[sanitized_collection_name]
|
||||
|
||||
vector_db = await self.vector_db_store.get_vector_db(sanitized_collection_name)
|
||||
if not vector_db:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
client = self._get_client()
|
||||
if not client.collections.exists(vector_db.identifier):
|
||||
raise ValueError(f"Collection with name `{vector_db.identifier}` not found")
|
||||
raise ValueError(f"Collection with name `{sanitized_collection_name}` not found")
|
||||
|
||||
index = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
index=WeaviateIndex(client=client, collection_name=vector_db.identifier),
|
||||
index=WeaviateIndex(client=client, collection_name=sanitized_collection_name),
|
||||
inference_api=self.inference_api,
|
||||
)
|
||||
self.cache[vector_db_id] = index
|
||||
self.cache[sanitized_collection_name] = index
|
||||
return index
|
||||
|
||||
async def insert_chunks(
|
||||
|
|
@ -189,9 +287,10 @@ class WeaviateVectorIOAdapter(
|
|||
chunks: list[Chunk],
|
||||
ttl_seconds: int | None = None,
|
||||
) -> None:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
|
||||
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
|
|
@ -201,8 +300,17 @@ class WeaviateVectorIOAdapter(
|
|||
query: InterleavedContent,
|
||||
params: dict[str, Any] | None = None,
|
||||
) -> QueryChunksResponse:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
sanitized_collection_name = sanitize_collection_name(vector_db_id, weaviate_format=True)
|
||||
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
raise VectorStoreNotFoundError(vector_db_id)
|
||||
|
||||
return await index.query_chunks(query, params)
|
||||
|
||||
async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None:
|
||||
sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True)
|
||||
index = await self._get_and_cache_vector_db_index(sanitized_collection_name)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {sanitized_collection_name} not found")
|
||||
|
||||
await index.delete(chunk_ids)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue