From e1755d1ed2a088bc9e0b96f4e06997941d8dd826 Mon Sep 17 00:00:00 2001 From: Francisco Arceo Date: Tue, 15 Jul 2025 15:46:49 -0400 Subject: [PATCH] chore: Adding OpenAI Vector Stores Files API compatibility for PGVector (#2755) # What does this PR do? Adding OpenAI Vector Stores Files API compatibility for PGVector ## Test Plan Updated CI to include PGVector --------- Signed-off-by: Francisco Javier Arceo --- .../remote/vector_io/pgvector/pgvector.py | 117 ++++++++++++++++-- .../vector_io/test_openai_vector_stores.py | 2 +- 2 files changed, 110 insertions(+), 9 deletions(-) diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index 1bf3eedf8..7fdd8af9b 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -218,9 +218,6 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco async def register_vector_db(self, vector_db: VectorDB) -> None: # Persist vector DB metadata in the KV store assert self.kvstore is not None - key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}" - await self.kvstore.set(key=key, value=vector_db.model_dump_json()) - # Upsert model metadata in Postgres upsert_models(self.conn, [(vector_db.identifier, vector_db)]) @@ -273,16 +270,120 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") + """Save vector store file metadata to Postgres database.""" + if self.conn is None: + raise RuntimeError("PostgreSQL connection is not initialized") + try: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS openai_vector_store_files ( + store_id TEXT, + file_id TEXT, + metadata JSONB, + PRIMARY KEY (store_id, file_id) + ) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS openai_vector_store_files_contents ( + store_id TEXT, + file_id TEXT, + contents JSONB, + PRIMARY KEY (store_id, file_id) + ) + """ + ) + # Insert file metadata + files_query = sql.SQL( + """ + INSERT INTO openai_vector_store_files (store_id, file_id, metadata) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata + """ + ) + files_values = [(store_id, file_id, Json(file_info))] + execute_values(cur, files_query, files_values, template="(%s, %s, %s)") + # Insert file contents + contents_query = sql.SQL( + """ + INSERT INTO openai_vector_store_files_contents (store_id, file_id, contents) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET contents = EXCLUDED.contents + """ + ) + contents_values = [(store_id, file_id, Json(file_contents))] + execute_values(cur, contents_query, contents_values, template="(%s, %s, %s)") + except Exception as e: + log.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") + raise async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") + """Load vector store file metadata from Postgres database.""" + if self.conn is None: + raise RuntimeError("PostgreSQL connection is not initialized") + try: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + "SELECT metadata FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", + (store_id, file_id), + ) + row = cur.fetchone() + return row[0] if row and row[0] is not None else {} + except Exception as e: + log.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}") + return {} async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") + """Load vector store file contents from Postgres database.""" + if self.conn is None: + raise RuntimeError("PostgreSQL connection is not initialized") + try: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + "SELECT contents FROM openai_vector_store_files_contents WHERE store_id = %s AND file_id = %s", + (store_id, file_id), + ) + row = cur.fetchone() + return row[0] if row and row[0] is not None else [] + except Exception as e: + log.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}") + return [] async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") + """Update vector store file metadata in Postgres database.""" + if self.conn is None: + raise RuntimeError("PostgreSQL connection is not initialized") + try: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + query = sql.SQL( + """ + INSERT INTO openai_vector_store_files (store_id, file_id, metadata) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata + """ + ) + values = [(store_id, file_id, Json(file_info))] + execute_values(cur, query, values, template="(%s, %s, %s)") + except Exception as e: + log.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") + raise async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - raise NotImplementedError("OpenAI Vector Stores API is not supported in PGVector") + """Delete vector store file metadata from Postgres database.""" + if self.conn is None: + raise RuntimeError("PostgreSQL connection is not initialized") + try: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + "DELETE FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", + (store_id, file_id), + ) + cur.execute( + "DELETE FROM openai_vector_store_files_contents WHERE store_id = %s AND file_id = %s", + (store_id, file_id), + ) + except Exception as e: + log.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}") + raise diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 7f947be30..d7300348b 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -31,7 +31,7 @@ def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): def skip_if_provider_doesnt_support_openai_vector_store_files_api(client_with_models): vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"] for p in vector_io_providers: - if p.provider_type in ["inline::faiss", "inline::sqlite-vec", "inline::milvus"]: + if p.provider_type in ["inline::faiss", "inline::sqlite-vec", "inline::milvus", "remote::pgvector"]: return pytest.skip("OpenAI vector stores are not supported by any provider")