diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index 2a1370c56..a57b4a4ee 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -260,48 +260,3 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr raise ValueError(f"Vector DB {vector_db_id} not found") return await index.query_chunks(query, params) - - async def _save_openai_vector_store_file( - self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] - ) -> None: - """Save vector store file data to kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" - await self.kvstore.set(key=key, value=json.dumps(file_info)) - content_key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" - await self.kvstore.set(key=content_key, value=json.dumps(file_contents)) - - async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" - stored_data = await self.kvstore.get(key) - return json.loads(stored_data) if stored_data else {} - - async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}" - stored_data = await self.kvstore.get(key) - return json.loads(stored_data) if stored_data else [] - - async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in kvstore.""" - assert self.kvstore is not None - key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" - await self.kvstore.set(key=key, value=json.dumps(file_info)) - - async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store data from kvstore.""" - assert self.kvstore is not None - - keys_to_delete = [ - f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}", - f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}", - ] - for key in keys_to_delete: - try: - await self.kvstore.delete(key) - except Exception as e: - logger.warning(f"Failed to delete key {key}: {e}") - continue diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 060b5b15c..f2598cc7c 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -5,7 +5,6 @@ # the root directory of this source tree. import asyncio -import json import logging import re import sqlite3 @@ -506,140 +505,6 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc await self.cache[vector_db_id].index.delete() del self.cache[vector_db_id] - async def _save_openai_vector_store_file( - self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] - ) -> None: - """Save vector store file metadata to SQLite database.""" - - def _create_or_store(): - connection = _create_sqlite_connection(self.config.db_path) - cur = connection.cursor() - try: - # Create a table to persist OpenAI vector store files. - cur.execute(""" - CREATE TABLE IF NOT EXISTS openai_vector_store_files ( - store_id TEXT, - file_id TEXT, - metadata TEXT, - PRIMARY KEY (store_id, file_id) - ); - """) - cur.execute(""" - CREATE TABLE IF NOT EXISTS openai_vector_store_files_contents ( - store_id TEXT, - file_id TEXT, - contents TEXT, - PRIMARY KEY (store_id, file_id) - ); - """) - connection.commit() - cur.execute( - "INSERT OR REPLACE INTO openai_vector_store_files (store_id, file_id, metadata) VALUES (?, ?, ?)", - (store_id, file_id, json.dumps(file_info)), - ) - cur.execute( - "INSERT OR REPLACE INTO openai_vector_store_files_contents (store_id, file_id, contents) VALUES (?, ?, ?)", - (store_id, file_id, json.dumps(file_contents)), - ) - connection.commit() - except Exception as e: - logger.error(f"Error saving openai vector store file {store_id} {file_id}: {e}") - raise - finally: - cur.close() - connection.close() - - try: - await asyncio.to_thread(_create_or_store) - except Exception as e: - logger.error(f"Error saving openai vector store file {store_id} {file_id}: {e}") - raise - - async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from SQLite database.""" - - def _load(): - connection = _create_sqlite_connection(self.config.db_path) - cur = connection.cursor() - try: - cur.execute( - "SELECT metadata FROM openai_vector_store_files WHERE store_id = ? AND file_id = ?", - (store_id, file_id), - ) - row = cur.fetchone() - if row is None: - return None - (metadata,) = row - return metadata - finally: - cur.close() - connection.close() - - stored_data = await asyncio.to_thread(_load) - return json.loads(stored_data) if stored_data else {} - - async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from SQLite database.""" - - def _load(): - connection = _create_sqlite_connection(self.config.db_path) - cur = connection.cursor() - try: - cur.execute( - "SELECT contents FROM openai_vector_store_files_contents WHERE store_id = ? AND file_id = ?", - (store_id, file_id), - ) - row = cur.fetchone() - if row is None: - return None - (contents,) = row - return contents - finally: - cur.close() - connection.close() - - stored_contents = await asyncio.to_thread(_load) - return json.loads(stored_contents) if stored_contents else [] - - async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in SQLite database.""" - - def _update(): - connection = _create_sqlite_connection(self.config.db_path) - cur = connection.cursor() - try: - cur.execute( - "UPDATE openai_vector_store_files SET metadata = ? WHERE store_id = ? AND file_id = ?", - (json.dumps(file_info), store_id, file_id), - ) - connection.commit() - finally: - cur.close() - connection.close() - - await asyncio.to_thread(_update) - - async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store file metadata from SQLite database.""" - - def _delete(): - connection = _create_sqlite_connection(self.config.db_path) - cur = connection.cursor() - try: - cur.execute( - "DELETE FROM openai_vector_store_files WHERE store_id = ? AND file_id = ?", (store_id, file_id) - ) - cur.execute( - "DELETE FROM openai_vector_store_files_contents WHERE store_id = ? AND file_id = ?", - (store_id, file_id), - ) - connection.commit() - finally: - cur.close() - connection.close() - - await asyncio.to_thread(_delete) - async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None: index = await self._get_and_cache_vector_db_index(vector_db_id) if not index: diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index f301942cb..dc4852821 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -5,7 +5,6 @@ # the root directory of this source tree. import asyncio -import json import logging import os import re @@ -370,186 +369,3 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP ) return await index.query_chunks(query, params) - - async def _save_openai_vector_store_file( - self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] - ) -> None: - """Save vector store file metadata to Milvus database.""" - if store_id not in self.openai_vector_stores: - store_info = await self._load_openai_vector_stores(store_id) - if not store_info: - logger.error(f"OpenAI vector store {store_id} not found") - raise ValueError(f"No vector store found with id {store_id}") - - try: - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): - file_schema = MilvusClient.create_schema( - auto_id=False, - enable_dynamic_field=True, - description="Metadata for OpenAI vector store files", - ) - file_schema.add_field( - field_name="store_file_id", datatype=DataType.VARCHAR, is_primary=True, max_length=512 - ) - file_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) - file_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) - file_schema.add_field(field_name="file_info", datatype=DataType.VARCHAR, max_length=65535) - - await asyncio.to_thread( - self.client.create_collection, - collection_name="openai_vector_store_files", - schema=file_schema, - ) - - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): - content_schema = MilvusClient.create_schema( - auto_id=False, - enable_dynamic_field=True, - description="Contents for OpenAI vector store files", - ) - content_schema.add_field( - field_name="chunk_id", datatype=DataType.VARCHAR, is_primary=True, max_length=1024 - ) - content_schema.add_field(field_name="store_file_id", datatype=DataType.VARCHAR, max_length=1024) - content_schema.add_field(field_name="store_id", datatype=DataType.VARCHAR, max_length=512) - content_schema.add_field(field_name="file_id", datatype=DataType.VARCHAR, max_length=512) - content_schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535) - - await asyncio.to_thread( - self.client.create_collection, - collection_name="openai_vector_store_files_contents", - schema=content_schema, - ) - - file_data = [ - { - "store_file_id": f"{store_id}_{file_id}", - "store_id": store_id, - "file_id": file_id, - "file_info": json.dumps(file_info), - } - ] - await asyncio.to_thread( - self.client.upsert, - collection_name="openai_vector_store_files", - data=file_data, - ) - - # Save file contents - contents_data = [ - { - "chunk_id": content.get("chunk_metadata").get("chunk_id"), - "store_file_id": f"{store_id}_{file_id}", - "store_id": store_id, - "file_id": file_id, - "content": json.dumps(content), - } - for content in file_contents - ] - await asyncio.to_thread( - self.client.upsert, - collection_name="openai_vector_store_files_contents", - data=contents_data, - ) - - except Exception as e: - logger.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") - - async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from Milvus database.""" - try: - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): - return {} - - query_filter = f"store_file_id == '{store_id}_{file_id}'" - results = await asyncio.to_thread( - self.client.query, - collection_name="openai_vector_store_files", - filter=query_filter, - output_fields=["file_info"], - ) - - if results: - try: - return json.loads(results[0]["file_info"]) - except json.JSONDecodeError as e: - logger.error(f"Failed to decode file_info for store {store_id}, file {file_id}: {e}") - return {} - return {} - except Exception as e: - logger.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}") - return {} - - async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in Milvus database.""" - try: - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): - return - - file_data = [ - { - "store_file_id": f"{store_id}_{file_id}", - "store_id": store_id, - "file_id": file_id, - "file_info": json.dumps(file_info), - } - ] - await asyncio.to_thread( - self.client.upsert, - collection_name="openai_vector_store_files", - data=file_data, - ) - except Exception as e: - logger.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") - raise - - async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from Milvus database.""" - try: - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): - return [] - - query_filter = ( - f"store_id == '{store_id}' AND file_id == '{file_id}' AND store_file_id == '{store_id}_{file_id}'" - ) - results = await asyncio.to_thread( - self.client.query, - collection_name="openai_vector_store_files_contents", - filter=query_filter, - output_fields=["chunk_id", "store_id", "file_id", "content"], - ) - - contents = [] - for result in results: - try: - content = json.loads(result["content"]) - contents.append(content) - except json.JSONDecodeError as e: - logger.error(f"Failed to decode content for store {store_id}, file {file_id}: {e}") - return contents - except Exception as e: - logger.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}") - return [] - - async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store file metadata from Milvus database.""" - try: - if not await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files"): - return - - query_filter = f"store_file_id in ['{store_id}_{file_id}']" - await asyncio.to_thread( - self.client.delete, - collection_name="openai_vector_store_files", - filter=query_filter, - ) - if await asyncio.to_thread(self.client.has_collection, "openai_vector_store_files_contents"): - await asyncio.to_thread( - self.client.delete, - collection_name="openai_vector_store_files_contents", - filter=query_filter, - ) - - except Exception as e: - logger.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}") - raise diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index 7fdd8af9b..3aeb3f30d 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -265,125 +265,3 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco index = PGVectorIndex(vector_db, vector_db.embedding_dimension, self.conn) self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api) return self.cache[vector_db_id] - - # OpenAI Vector Stores File operations are not supported in PGVector - async def _save_openai_vector_store_file( - self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] - ) -> None: - """Save vector store file metadata to Postgres database.""" - if self.conn is None: - raise RuntimeError("PostgreSQL connection is not initialized") - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute( - """ - CREATE TABLE IF NOT EXISTS openai_vector_store_files ( - store_id TEXT, - file_id TEXT, - metadata JSONB, - PRIMARY KEY (store_id, file_id) - ) - """ - ) - cur.execute( - """ - CREATE TABLE IF NOT EXISTS openai_vector_store_files_contents ( - store_id TEXT, - file_id TEXT, - contents JSONB, - PRIMARY KEY (store_id, file_id) - ) - """ - ) - # Insert file metadata - files_query = sql.SQL( - """ - INSERT INTO openai_vector_store_files (store_id, file_id, metadata) - VALUES %s - ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata - """ - ) - files_values = [(store_id, file_id, Json(file_info))] - execute_values(cur, files_query, files_values, template="(%s, %s, %s)") - # Insert file contents - contents_query = sql.SQL( - """ - INSERT INTO openai_vector_store_files_contents (store_id, file_id, contents) - VALUES %s - ON CONFLICT (store_id, file_id) DO UPDATE SET contents = EXCLUDED.contents - """ - ) - contents_values = [(store_id, file_id, Json(file_contents))] - execute_values(cur, contents_query, contents_values, template="(%s, %s, %s)") - except Exception as e: - log.error(f"Error saving openai vector store file {file_id} for store {store_id}: {e}") - raise - - async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: - """Load vector store file metadata from Postgres database.""" - if self.conn is None: - raise RuntimeError("PostgreSQL connection is not initialized") - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute( - "SELECT metadata FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", - (store_id, file_id), - ) - row = cur.fetchone() - return row[0] if row and row[0] is not None else {} - except Exception as e: - log.error(f"Error loading openai vector store file {file_id} for store {store_id}: {e}") - return {} - - async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: - """Load vector store file contents from Postgres database.""" - if self.conn is None: - raise RuntimeError("PostgreSQL connection is not initialized") - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute( - "SELECT contents FROM openai_vector_store_files_contents WHERE store_id = %s AND file_id = %s", - (store_id, file_id), - ) - row = cur.fetchone() - return row[0] if row and row[0] is not None else [] - except Exception as e: - log.error(f"Error loading openai vector store file contents for {file_id} in store {store_id}: {e}") - return [] - - async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: - """Update vector store file metadata in Postgres database.""" - if self.conn is None: - raise RuntimeError("PostgreSQL connection is not initialized") - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - query = sql.SQL( - """ - INSERT INTO openai_vector_store_files (store_id, file_id, metadata) - VALUES %s - ON CONFLICT (store_id, file_id) DO UPDATE SET metadata = EXCLUDED.metadata - """ - ) - values = [(store_id, file_id, Json(file_info))] - execute_values(cur, query, values, template="(%s, %s, %s)") - except Exception as e: - log.error(f"Error updating openai vector store file {file_id} for store {store_id}: {e}") - raise - - async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: - """Delete vector store file metadata from Postgres database.""" - if self.conn is None: - raise RuntimeError("PostgreSQL connection is not initialized") - try: - with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute( - "DELETE FROM openai_vector_store_files WHERE store_id = %s AND file_id = %s", - (store_id, file_id), - ) - cur.execute( - "DELETE FROM openai_vector_store_files_contents WHERE store_id = %s AND file_id = %s", - (store_id, file_id), - ) - except Exception as e: - log.error(f"Error deleting openai vector store file {file_id} for store {store_id}: {e}") - raise diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index c11de396b..f178e9299 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -66,7 +66,7 @@ class OpenAIVectorStoreMixin(ABC): async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Save vector store metadata to persistent storage.""" - assert self.kvstore is not None + assert self.kvstore key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" await self.kvstore.set(key=key, value=json.dumps(store_info)) # update in-memory cache @@ -74,7 +74,7 @@ class OpenAIVectorStoreMixin(ABC): async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]: """Load all vector store metadata from persistent storage.""" - assert self.kvstore is not None + assert self.kvstore start_key = OPENAI_VECTOR_STORES_PREFIX end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff" stored_data = await self.kvstore.values_in_range(start_key, end_key) @@ -87,7 +87,7 @@ class OpenAIVectorStoreMixin(ABC): async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Update vector store metadata in persistent storage.""" - assert self.kvstore is not None + assert self.kvstore key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" await self.kvstore.set(key=key, value=json.dumps(store_info)) # update in-memory cache @@ -95,38 +95,62 @@ class OpenAIVectorStoreMixin(ABC): async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None: """Delete vector store metadata from persistent storage.""" - assert self.kvstore is not None + assert self.kvstore key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}" await self.kvstore.delete(key) # remove from in-memory cache self.openai_vector_stores.pop(store_id, None) - @abstractmethod async def _save_openai_vector_store_file( self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]] ) -> None: """Save vector store file metadata to persistent storage.""" - pass + assert self.kvstore + meta_key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=meta_key, value=json.dumps(file_info)) + contents_prefix = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}:" + for idx, chunk in enumerate(file_contents): + await self.kvstore.set(key=f"{contents_prefix}{idx}", value=json.dumps(chunk)) - @abstractmethod async def _load_openai_vector_store_file(self, store_id: str, file_id: str) -> dict[str, Any]: """Load vector store file metadata from persistent storage.""" - pass + assert self.kvstore + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + stored_data = await self.kvstore.get(key) + return json.loads(stored_data) if stored_data else {} - @abstractmethod async def _load_openai_vector_store_file_contents(self, store_id: str, file_id: str) -> list[dict[str, Any]]: """Load vector store file contents from persistent storage.""" - pass + assert self.kvstore + prefix = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}:" + end_key = f"{prefix}\xff" + raw_items = await self.kvstore.values_in_range(prefix, end_key) + return [json.loads(item) for item in raw_items] - @abstractmethod async def _update_openai_vector_store_file(self, store_id: str, file_id: str, file_info: dict[str, Any]) -> None: """Update vector store file metadata in persistent storage.""" - pass + assert self.kvstore + key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.set(key=key, value=json.dumps(file_info)) - @abstractmethod async def _delete_openai_vector_store_file_from_storage(self, store_id: str, file_id: str) -> None: """Delete vector store file metadata from persistent storage.""" - pass + assert self.kvstore + + meta_key = f"{OPENAI_VECTOR_STORES_FILES_PREFIX}{store_id}:{file_id}" + await self.kvstore.delete(meta_key) + + contents_prefix = f"{OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX}{store_id}:{file_id}:" + end_key = f"{contents_prefix}\xff" + # load all stored chunk values (values_in_range is implemented by all backends) + raw_items = await self.kvstore.values_in_range(contents_prefix, end_key) + # delete each chunk by its index suffix + for idx in range(len(raw_items)): + await self.kvstore.delete(f"{contents_prefix}{idx}") + + async def initialize_openai_vector_stores(self) -> None: + """Load existing OpenAI vector stores into the in-memory cache.""" + self.openai_vector_stores = await self._load_openai_vector_stores() @abstractmethod async def register_vector_db(self, vector_db: VectorDB) -> None: @@ -138,10 +162,6 @@ class OpenAIVectorStoreMixin(ABC): """Unregister a vector database (provider-specific implementation).""" pass - async def initialize_openai_vector_stores(self) -> None: - """Load existing OpenAI vector stores into the in-memory cache.""" - self.openai_vector_stores = await self._load_openai_vector_stores() - @abstractmethod async def insert_chunks( self,