From f097da7a312a5e42f9d303993d2f059837e76b04 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 15 Jul 2025 11:34:31 -0400 Subject: [PATCH] updating to use execute_values Signed-off-by: Francisco Javier Arceo --- .../remote/vector_io/pgvector/pgvector.py | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index cd610a3ef..19ec9b03b 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -217,10 +217,6 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco async def register_vector_db(self, vector_db: VectorDB) -> None: # Persist vector DB metadata in the KV store - assert self.kvstore is not None - key = f"{VECTOR_DBS_PREFIX}{vector_db.identifier}" - await self.kvstore.set(key=key, value=vector_db.model_dump_json()) - # Upsert model metadata in Postgres upsert_models(self.conn, [(vector_db.identifier, vector_db)]) @@ -277,7 +273,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco if self.conn is None: raise RuntimeError("PostgreSQL connection is not initialized") try: - with self.conn.cursor() as cur: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS openai_vector_store_files ( @@ -298,16 +294,26 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco ) """ ) - cur.execute( - "INSERT INTO openai_vector_store_files (store_id, file_id, metadata) VALUES (%s, %s, %s)" - " ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata", - (store_id, file_id, Json(file_info)), + # Insert file metadata + files_query = sql.SQL( + """ + INSERT INTO openai_vector_store_files (store_id, file_id, metadata) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata + """ ) - cur.execute( - "INSERT INTO openai_vector_store_files_contents (store_id, file_id, contents) VALUES (%s, %s, %s)" - " ON CONFLICT (store_id, file_id) DO UPDATE SET contents = EXCLUDED.contents", - (store_id, file_id, Json(file_contents)), + files_values = [(store_id, file_id, Json(file_info))] + execute_values(cur, files_query, files_values, template="(%s, %s, %s)") + # Insert file contents + contents_query = sql.SQL( + """ + INSERT INTO openai_vector_store_files_contents (store_id, file_id, contents) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET contents = EXCLUDED.contents + """ ) + contents_values = [(store_id, file_id, Json(file_contents))] + execute_values(cur, contents_query, contents_values, template="(%s, %s, %s)") except Exception as e: log.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") raise @@ -317,7 +323,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco if self.conn is None: raise RuntimeError("PostgreSQL connection is not initialized") try: - with self.conn.cursor() as cur: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( "SELECT metadata FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", (store_id, file_id), @@ -333,7 +339,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco if self.conn is None: raise RuntimeError("PostgreSQL connection is not initialized") try: - with self.conn.cursor() as cur: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( "SELECT contents FROM openai_vector_store_files_contents WHERE store_id = %s AND file_id = %s", (store_id, file_id), @@ -349,11 +355,16 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco if self.conn is None: raise RuntimeError("PostgreSQL connection is not initialized") try: - with self.conn.cursor() as cur: - cur.execute( - "UPDATE openai_vector_store_files SET metadata = %s WHERE store_id = %s AND file_id = %s", - (Json(file_info), store_id, file_id), + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + query = sql.SQL( + """ + INSERT INTO openai_vector_store_files (store_id, file_id, metadata) + VALUES %s + ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata + """ ) + values = [(store_id, file_id, Json(file_info))] + execute_values(cur, query, values, template="(%s, %s, %s)") except Exception as e: log.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") raise @@ -363,7 +374,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco if self.conn is None: raise RuntimeError("PostgreSQL connection is not initialized") try: - with self.conn.cursor() as cur: + with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( "DELETE FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", (store_id, file_id),