feat: Adding OpenAI Compatible Prompts API

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Javier Arceo 2025-09-03 14:14:54 -04:00
parent 30117dea22
commit 8b00883abd
181 changed files with 21356 additions and 10332 deletions

View file

@ -6,15 +6,14 @@
from typing import Any
from llama_stack.core.datatypes import Api
from llama_stack.core.datatypes import AccessRule, Api
from .config import S3FilesImplConfig
async def get_adapter_impl(config: S3FilesImplConfig, deps: dict[Api, Any]):
async def get_adapter_impl(config: S3FilesImplConfig, deps: dict[Api, Any], policy: list[AccessRule] | None = None):
from .files import S3FilesImpl
# TODO: authorization policies and user separation
impl = S3FilesImpl(config)
impl = S3FilesImpl(config, policy or [])
await impl.initialize()
return impl

View file

@ -4,9 +4,9 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import time
import uuid
from typing import Annotated
from datetime import UTC, datetime
from typing import Annotated, Any
import boto3
from botocore.exceptions import BotoCoreError, ClientError, NoCredentialsError
@ -15,14 +15,17 @@ from fastapi import File, Form, Response, UploadFile
from llama_stack.apis.common.errors import ResourceNotFoundError
from llama_stack.apis.common.responses import Order
from llama_stack.apis.files import (
ExpiresAfter,
Files,
ListOpenAIFileResponse,
OpenAIFileDeleteResponse,
OpenAIFileObject,
OpenAIFilePurpose,
)
from llama_stack.core.datatypes import AccessRule
from llama_stack.providers.utils.sqlstore.api import ColumnDefinition, ColumnType
from llama_stack.providers.utils.sqlstore.sqlstore import SqlStore, sqlstore_impl
from llama_stack.providers.utils.sqlstore.authorized_sqlstore import AuthorizedSqlStore
from llama_stack.providers.utils.sqlstore.sqlstore import sqlstore_impl
from .config import S3FilesImplConfig
@ -83,22 +86,85 @@ async def _create_bucket_if_not_exists(client: boto3.client, config: S3FilesImpl
raise RuntimeError(f"Failed to access S3 bucket '{config.bucket_name}': {e}") from e
def _make_file_object(
*,
id: str,
filename: str,
purpose: str,
bytes: int,
created_at: int,
expires_at: int,
**kwargs: Any, # here to ignore any additional fields, e.g. extra fields from AuthorizedSqlStore
) -> OpenAIFileObject:
"""
Construct an OpenAIFileObject and normalize expires_at.
If expires_at is greater than the max we treat it as no-expiration and
return None for expires_at.
The OpenAI spec says expires_at type is Integer, but the implementation
will return None for no expiration.
"""
obj = OpenAIFileObject(
id=id,
filename=filename,
purpose=OpenAIFilePurpose(purpose),
bytes=bytes,
created_at=created_at,
expires_at=expires_at,
)
if obj.expires_at is not None and obj.expires_at > (obj.created_at + ExpiresAfter.MAX):
obj.expires_at = None # type: ignore
return obj
class S3FilesImpl(Files):
"""S3-based implementation of the Files API."""
# TODO: implement expiration, for now a silly offset
_SILLY_EXPIRATION_OFFSET = 100 * 365 * 24 * 60 * 60
def __init__(self, config: S3FilesImplConfig) -> None:
def __init__(self, config: S3FilesImplConfig, policy: list[AccessRule]) -> None:
self._config = config
self.policy = policy
self._client: boto3.client | None = None
self._sql_store: SqlStore | None = None
self._sql_store: AuthorizedSqlStore | None = None
def _now(self) -> int:
"""Return current UTC timestamp as int seconds."""
return int(datetime.now(UTC).timestamp())
async def _get_file(self, file_id: str, return_expired: bool = False) -> dict[str, Any]:
where: dict[str, str | dict] = {"id": file_id}
if not return_expired:
where["expires_at"] = {">": self._now()}
if not (row := await self.sql_store.fetch_one("openai_files", policy=self.policy, where=where)):
raise ResourceNotFoundError(file_id, "File", "files.list()")
return row
async def _delete_file(self, file_id: str) -> None:
"""Delete a file from S3 and the database."""
try:
self.client.delete_object(
Bucket=self._config.bucket_name,
Key=file_id,
)
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchKey":
raise RuntimeError(f"Failed to delete file from S3: {e}") from e
await self.sql_store.delete("openai_files", where={"id": file_id})
async def _delete_if_expired(self, file_id: str) -> None:
"""If the file exists and is expired, delete it."""
if row := await self._get_file(file_id, return_expired=True):
if (expires_at := row.get("expires_at")) and expires_at <= self._now():
await self._delete_file(file_id)
async def initialize(self) -> None:
self._client = _create_s3_client(self._config)
await _create_bucket_if_not_exists(self._client, self._config)
self._sql_store = sqlstore_impl(self._config.metadata_store)
self._sql_store = AuthorizedSqlStore(sqlstore_impl(self._config.metadata_store))
await self._sql_store.create_table(
"openai_files",
{
@ -121,7 +187,7 @@ class S3FilesImpl(Files):
return self._client
@property
def sql_store(self) -> SqlStore:
def sql_store(self) -> AuthorizedSqlStore:
assert self._sql_store is not None, "Provider not initialized"
return self._sql_store
@ -129,27 +195,47 @@ class S3FilesImpl(Files):
self,
file: Annotated[UploadFile, File()],
purpose: Annotated[OpenAIFilePurpose, Form()],
expires_after_anchor: Annotated[str | None, Form(alias="expires_after[anchor]")] = None,
expires_after_seconds: Annotated[int | None, Form(alias="expires_after[seconds]")] = None,
) -> OpenAIFileObject:
file_id = f"file-{uuid.uuid4().hex}"
filename = getattr(file, "filename", None) or "uploaded_file"
created_at = int(time.time())
expires_at = created_at + self._SILLY_EXPIRATION_OFFSET
created_at = self._now()
expires_after = None
if expires_after_anchor is not None or expires_after_seconds is not None:
# we use ExpiresAfter to validate input
expires_after = ExpiresAfter(
anchor=expires_after_anchor, # type: ignore[arg-type]
seconds=expires_after_seconds, # type: ignore[arg-type]
)
# the default is no expiration.
# to implement no expiration we set an expiration beyond the max.
# we'll hide this fact from users when returning the file object.
expires_at = created_at + ExpiresAfter.MAX * 42
# the default for BATCH files is 30 days, which happens to be the expiration max.
if purpose == OpenAIFilePurpose.BATCH:
expires_at = created_at + ExpiresAfter.MAX
if expires_after is not None:
expires_at = created_at + expires_after.seconds
content = await file.read()
file_size = len(content)
await self.sql_store.insert(
"openai_files",
{
"id": file_id,
"filename": filename,
"purpose": purpose.value,
"bytes": file_size,
"created_at": created_at,
"expires_at": expires_at,
},
)
entry: dict[str, Any] = {
"id": file_id,
"filename": filename,
"purpose": purpose.value,
"bytes": file_size,
"created_at": created_at,
"expires_at": expires_at,
}
await self.sql_store.insert("openai_files", entry)
try:
self.client.put_object(
@ -163,14 +249,7 @@ class S3FilesImpl(Files):
raise RuntimeError(f"Failed to upload file to S3: {e}") from e
return OpenAIFileObject(
id=file_id,
filename=filename,
purpose=purpose,
bytes=file_size,
created_at=created_at,
expires_at=expires_at,
)
return _make_file_object(**entry)
async def openai_list_files(
self,
@ -183,29 +262,20 @@ class S3FilesImpl(Files):
if not order:
order = Order.desc
where_conditions = {}
where_conditions: dict[str, Any] = {"expires_at": {">": self._now()}}
if purpose:
where_conditions["purpose"] = purpose.value
paginated_result = await self.sql_store.fetch_all(
table="openai_files",
where=where_conditions if where_conditions else None,
policy=self.policy,
where=where_conditions,
order_by=[("created_at", order.value)],
cursor=("id", after) if after else None,
limit=limit,
)
files = [
OpenAIFileObject(
id=row["id"],
filename=row["filename"],
purpose=OpenAIFilePurpose(row["purpose"]),
bytes=row["bytes"],
created_at=row["created_at"],
expires_at=row["expires_at"],
)
for row in paginated_result.data
]
files = [_make_file_object(**row) for row in paginated_result.data]
return ListOpenAIFileResponse(
data=files,
@ -216,41 +286,20 @@ class S3FilesImpl(Files):
)
async def openai_retrieve_file(self, file_id: str) -> OpenAIFileObject:
row = await self.sql_store.fetch_one("openai_files", where={"id": file_id})
if not row:
raise ResourceNotFoundError(file_id, "File", "files.list()")
return OpenAIFileObject(
id=row["id"],
filename=row["filename"],
purpose=OpenAIFilePurpose(row["purpose"]),
bytes=row["bytes"],
created_at=row["created_at"],
expires_at=row["expires_at"],
)
await self._delete_if_expired(file_id)
row = await self._get_file(file_id)
return _make_file_object(**row)
async def openai_delete_file(self, file_id: str) -> OpenAIFileDeleteResponse:
row = await self.sql_store.fetch_one("openai_files", where={"id": file_id})
if not row:
raise ResourceNotFoundError(file_id, "File", "files.list()")
try:
self.client.delete_object(
Bucket=self._config.bucket_name,
Key=row["id"],
)
except ClientError as e:
if e.response["Error"]["Code"] != "NoSuchKey":
raise RuntimeError(f"Failed to delete file from S3: {e}") from e
await self.sql_store.delete("openai_files", where={"id": file_id})
await self._delete_if_expired(file_id)
_ = await self._get_file(file_id) # raises if not found
await self._delete_file(file_id)
return OpenAIFileDeleteResponse(id=file_id, deleted=True)
async def openai_retrieve_file_content(self, file_id: str) -> Response:
row = await self.sql_store.fetch_one("openai_files", where={"id": file_id})
if not row:
raise ResourceNotFoundError(file_id, "File", "files.list()")
await self._delete_if_expired(file_id)
row = await self._get_file(file_id)
try:
response = self.client.get_object(
@ -261,7 +310,7 @@ class S3FilesImpl(Files):
content = response["Body"].read()
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
await self.sql_store.delete("openai_files", where={"id": file_id})
await self._delete_file(file_id)
raise ResourceNotFoundError(file_id, "File", "files.list()") from e
raise RuntimeError(f"Failed to download file from S3: {e}") from e

View file

@ -41,10 +41,10 @@ client.initialize()
### Create Completion
> Note on Completion API
>
> The hosted NVIDIA Llama NIMs (e.g., `meta-llama/Llama-3.1-8B-Instruct`) with ```NVIDIA_BASE_URL="https://integrate.api.nvidia.com"``` does not support the ```completion``` method, while the locally deployed NIM does.
The following example shows how to create a completion for an NVIDIA NIM.
> [!NOTE]
> The hosted NVIDIA Llama NIMs (for example ```meta-llama/Llama-3.1-8B-Instruct```) that have ```NVIDIA_BASE_URL="https://integrate.api.nvidia.com"``` do not support the ```completion``` method, while locally deployed NIMs do.
```python
response = client.inference.completion(
@ -60,6 +60,8 @@ print(f"Response: {response.content}")
### Create Chat Completion
The following example shows how to create a chat completion for an NVIDIA NIM.
```python
response = client.inference.chat_completion(
model_id="meta-llama/Llama-3.1-8B-Instruct",
@ -82,6 +84,9 @@ print(f"Response: {response.completion_message.content}")
```
### Tool Calling Example ###
The following example shows how to do tool calling for an NVIDIA NIM.
```python
from llama_stack.models.llama.datatypes import ToolDefinition, ToolParamDefinition
@ -117,6 +122,9 @@ if tool_response.completion_message.tool_calls:
```
### Structured Output Example
The following example shows how to do structured output for an NVIDIA NIM.
```python
from llama_stack.apis.inference import JsonSchemaResponseFormat, ResponseFormatType
@ -149,8 +157,10 @@ print(f"Structured Response: {structured_response.completion_message.content}")
```
### Create Embeddings
> Note on OpenAI embeddings compatibility
>
The following example shows how to create embeddings for an NVIDIA NIM.
> [!NOTE]
> NVIDIA asymmetric embedding models (e.g., `nvidia/llama-3.2-nv-embedqa-1b-v2`) require an `input_type` parameter not present in the standard OpenAI embeddings API. The NVIDIA Inference Adapter automatically sets `input_type="query"` when using the OpenAI-compatible embeddings endpoint for NVIDIA. For passage embeddings, use the `embeddings` API with `task_type="document"`.
```python
@ -160,4 +170,42 @@ response = client.inference.embeddings(
task_type="query",
)
print(f"Embeddings: {response.embeddings}")
```
```
### Vision Language Models Example
The following example shows how to run vision inference by using an NVIDIA NIM.
```python
def load_image_as_base64(image_path):
with open(image_path, "rb") as image_file:
img_bytes = image_file.read()
return base64.b64encode(img_bytes).decode("utf-8")
image_path = {path_to_the_image}
demo_image_b64 = load_image_as_base64(image_path)
vlm_response = client.inference.chat_completion(
model_id="nvidia/vila",
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"image": {
"data": demo_image_b64,
},
},
{
"type": "text",
"text": "Please describe what you see in this image in detail.",
},
],
}
],
)
print(f"VLM Response: {vlm_response.completion_message.content}")
```

View file

@ -55,6 +55,10 @@ MODEL_ENTRIES = [
"meta/llama-3.3-70b-instruct",
CoreModelId.llama3_3_70b_instruct.value,
),
ProviderModelEntry(
provider_model_id="nvidia/vila",
model_type=ModelType.llm,
),
# NeMo Retriever Text Embedding models -
#
# https://docs.nvidia.com/nim/nemo-retriever/text-embedding/latest/support-matrix.html

View file

@ -118,10 +118,10 @@ class OllamaInferenceAdapter(
async def initialize(self) -> None:
logger.info(f"checking connectivity to Ollama at `{self.config.url}`...")
health_response = await self.health()
if health_response["status"] == HealthStatus.ERROR:
r = await self.health()
if r["status"] == HealthStatus.ERROR:
logger.warning(
"Ollama Server is not running, make sure to start it using `ollama serve` in a separate terminal"
f"Ollama Server is not running (message: {r['message']}). Make sure to start it using `ollama serve` in a separate terminal"
)
async def should_refresh_models(self) -> bool:
@ -156,7 +156,7 @@ class OllamaInferenceAdapter(
),
Model(
identifier="nomic-embed-text",
provider_resource_id="nomic-embed-text",
provider_resource_id="nomic-embed-text:latest",
provider_id=provider_id,
metadata={
"embedding_dimension": 768,

View file

@ -7,8 +7,8 @@
from collections.abc import AsyncGenerator, AsyncIterator
from typing import Any
from ibm_watson_machine_learning.foundation_models import Model
from ibm_watson_machine_learning.metanames import GenTextParamsMetaNames as GenParams
from ibm_watsonx_ai.foundation_models import Model
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams
from openai import AsyncOpenAI
from llama_stack.apis.common.content_types import InterleavedContent, InterleavedContentItem

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import heapq
from typing import Any
import psycopg2
@ -23,6 +24,9 @@ from llama_stack.apis.vector_io import (
)
from llama_stack.log import get_logger
from llama_stack.providers.datatypes import Api, VectorDBsProtocolPrivate
from llama_stack.providers.utils.inference.prompt_adapter import (
interleaved_content_as_str,
)
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
@ -31,6 +35,7 @@ from llama_stack.providers.utils.memory.vector_store import (
EmbeddingIndex,
VectorDBWithIndex,
)
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator, sanitize_collection_name
from .config import PGVectorVectorIOConfig
@ -72,25 +77,63 @@ def load_models(cur, cls):
class PGVectorIndex(EmbeddingIndex):
def __init__(self, vector_db: VectorDB, dimension: int, conn, kvstore: KVStore | None = None):
self.conn = conn
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Sanitize the table name by replacing hyphens with underscores
# SQL doesn't allow hyphens in table names, and vector_db.identifier may contain hyphens
# when created with patterns like "test-vector-db-{uuid4()}"
sanitized_identifier = vector_db.identifier.replace("-", "_")
self.table_name = f"vector_store_{sanitized_identifier}"
self.kvstore = kvstore
# reference: https://github.com/pgvector/pgvector?tab=readme-ov-file#querying
PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION: dict[str, str] = {
"L2": "<->",
"L1": "<+>",
"COSINE": "<=>",
"INNER_PRODUCT": "<#>",
"HAMMING": "<~>",
"JACCARD": "<%>",
}
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id TEXT PRIMARY KEY,
document JSONB,
embedding vector({dimension})
def __init__(
self,
vector_db: VectorDB,
dimension: int,
conn: psycopg2.extensions.connection,
kvstore: KVStore | None = None,
distance_metric: str = "COSINE",
):
self.vector_db = vector_db
self.dimension = dimension
self.conn = conn
self.kvstore = kvstore
self.check_distance_metric_availability(distance_metric)
self.distance_metric = distance_metric
self.table_name = None
async def initialize(self) -> None:
try:
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Sanitize the table name by replacing hyphens with underscores
# SQL doesn't allow hyphens in table names, and vector_db.identifier may contain hyphens
# when created with patterns like "test-vector-db-{uuid4()}"
sanitized_identifier = sanitize_collection_name(self.vector_db.identifier)
self.table_name = f"vs_{sanitized_identifier}"
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id TEXT PRIMARY KEY,
document JSONB,
embedding vector({self.dimension}),
content_text TEXT,
tokenized_content TSVECTOR
)
"""
)
"""
)
# Create GIN index for full-text search performance
cur.execute(
f"""
CREATE INDEX IF NOT EXISTS {self.table_name}_content_gin_idx
ON {self.table_name} USING GIN(tokenized_content)
"""
)
except Exception as e:
log.exception(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}")
raise RuntimeError(f"Error creating PGVectorIndex for vector_db: {self.vector_db.identifier}") from e
async def add_chunks(self, chunks: list[Chunk], embeddings: NDArray):
assert len(chunks) == len(embeddings), (
@ -99,29 +142,49 @@ class PGVectorIndex(EmbeddingIndex):
values = []
for i, chunk in enumerate(chunks):
content_text = interleaved_content_as_str(chunk.content)
values.append(
(
f"{chunk.chunk_id}",
Json(chunk.model_dump()),
embeddings[i].tolist(),
content_text,
content_text, # Pass content_text twice - once for content_text column, once for to_tsvector function. Eg. to_tsvector(content_text) = tokenized_content
)
)
query = sql.SQL(
f"""
INSERT INTO {self.table_name} (id, document, embedding)
INSERT INTO {self.table_name} (id, document, embedding, content_text, tokenized_content)
VALUES %s
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding, document = EXCLUDED.document
ON CONFLICT (id) DO UPDATE SET
embedding = EXCLUDED.embedding,
document = EXCLUDED.document,
content_text = EXCLUDED.content_text,
tokenized_content = EXCLUDED.tokenized_content
"""
)
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
execute_values(cur, query, values, template="(%s, %s, %s::vector)")
execute_values(cur, query, values, template="(%s, %s, %s::vector, %s, to_tsvector('english', %s))")
async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse:
"""
Performs vector similarity search using PostgreSQL's search function. Default distance metric is COSINE.
Args:
embedding: The query embedding vector
k: Number of results to return
score_threshold: Minimum similarity score threshold
Returns:
QueryChunksResponse with combined results
"""
pgvector_search_function = self.get_pgvector_search_function()
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
f"""
SELECT document, embedding <-> %s::vector AS distance
SELECT document, embedding {pgvector_search_function} %s::vector AS distance
FROM {self.table_name}
ORDER BY distance
LIMIT %s
@ -147,7 +210,40 @@ class PGVectorIndex(EmbeddingIndex):
k: int,
score_threshold: float,
) -> QueryChunksResponse:
raise NotImplementedError("Keyword search is not supported in PGVector")
"""
Performs keyword-based search using PostgreSQL's full-text search with ts_rank scoring.
Args:
query_string: The text query for keyword search
k: Number of results to return
score_threshold: Minimum similarity score threshold
Returns:
QueryChunksResponse with combined results
"""
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
# Use plainto_tsquery to handle user input safely and ts_rank for relevance scoring
cur.execute(
f"""
SELECT document, ts_rank(tokenized_content, plainto_tsquery('english', %s)) AS score
FROM {self.table_name}
WHERE tokenized_content @@ plainto_tsquery('english', %s)
ORDER BY score DESC
LIMIT %s
""",
(query_string, query_string, k),
)
results = cur.fetchall()
chunks = []
scores = []
for doc, score in results:
if score < score_threshold:
continue
chunks.append(Chunk(**doc))
scores.append(float(score))
return QueryChunksResponse(chunks=chunks, scores=scores)
async def query_hybrid(
self,
@ -158,7 +254,59 @@ class PGVectorIndex(EmbeddingIndex):
reranker_type: str,
reranker_params: dict[str, Any] | None = None,
) -> QueryChunksResponse:
raise NotImplementedError("Hybrid search is not supported in PGVector")
"""
Hybrid search combining vector similarity and keyword search using configurable reranking.
Args:
embedding: The query embedding vector
query_string: The text query for keyword search
k: Number of results to return
score_threshold: Minimum similarity score threshold
reranker_type: Type of reranker to use ("rrf" or "weighted")
reranker_params: Parameters for the reranker
Returns:
QueryChunksResponse with combined results
"""
if reranker_params is None:
reranker_params = {}
# Get results from both search methods
vector_response = await self.query_vector(embedding, k, score_threshold)
keyword_response = await self.query_keyword(query_string, k, score_threshold)
# Convert responses to score dictionaries using chunk_id
vector_scores = {
chunk.chunk_id: score for chunk, score in zip(vector_response.chunks, vector_response.scores, strict=False)
}
keyword_scores = {
chunk.chunk_id: score
for chunk, score in zip(keyword_response.chunks, keyword_response.scores, strict=False)
}
# Combine scores using the reranking utility
combined_scores = WeightedInMemoryAggregator.combine_search_results(
vector_scores, keyword_scores, reranker_type, reranker_params
)
# Efficient top-k selection because it only tracks the k best candidates it's seen so far
top_k_items = heapq.nlargest(k, combined_scores.items(), key=lambda x: x[1])
# Filter by score threshold
filtered_items = [(doc_id, score) for doc_id, score in top_k_items if score >= score_threshold]
# Create a map of chunk_id to chunk for both responses
chunk_map = {c.chunk_id: c for c in vector_response.chunks + keyword_response.chunks}
# Use the map to look up chunks by their IDs
chunks = []
scores = []
for doc_id, score in filtered_items:
if doc_id in chunk_map:
chunks.append(chunk_map[doc_id])
scores.append(score)
return QueryChunksResponse(chunks=chunks, scores=scores)
async def delete(self):
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
@ -170,6 +318,25 @@ class PGVectorIndex(EmbeddingIndex):
with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(f"DELETE FROM {self.table_name} WHERE id = ANY(%s)", (chunk_ids,))
def get_pgvector_search_function(self) -> str:
return self.PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION[self.distance_metric]
def check_distance_metric_availability(self, distance_metric: str) -> None:
"""Check if the distance metric is supported by PGVector.
Args:
distance_metric: The distance metric to check
Raises:
ValueError: If the distance metric is not supported
"""
if distance_metric not in self.PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION:
supported_metrics = list(self.PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION.keys())
raise ValueError(
f"Distance metric '{distance_metric}' is not supported by PGVector. "
f"Supported metrics are: {', '.join(supported_metrics)}"
)
class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate):
def __init__(
@ -185,8 +352,8 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
self.files_api = files_api
self.kvstore: KVStore | None = None
self.vector_db_store = None
self.openai_vector_store: dict[str, dict[str, Any]] = {}
self.metadatadata_collection_name = "openai_vector_stores_metadata"
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
self.metadata_collection_name = "openai_vector_stores_metadata"
async def initialize(self) -> None:
log.info(f"Initializing PGVector memory adapter with config: {self.config}")
@ -233,9 +400,13 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
upsert_models(self.conn, [(vector_db.identifier, vector_db)])
# Create and cache the PGVector index table for the vector DB
pgvector_index = PGVectorIndex(
vector_db=vector_db, dimension=vector_db.embedding_dimension, conn=self.conn, kvstore=self.kvstore
)
await pgvector_index.initialize()
index = VectorDBWithIndex(
vector_db,
index=PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn, kvstore=self.kvstore),
index=pgvector_index,
inference_api=self.inference_api,
)
self.cache[vector_db.identifier] = index
@ -272,8 +443,15 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco
if vector_db_id in self.cache:
return self.cache[vector_db_id]
if self.vector_db_store is None:
raise VectorStoreNotFoundError(vector_db_id)
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
if not vector_db:
raise VectorStoreNotFoundError(vector_db_id)
index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn)
await index.initialize()
self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api)
return self.cache[vector_db_id]