diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index d5fde8595..db269dce0 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -207,6 +207,7 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr self.kvstore: KVStore | None = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 async def initialize(self) -> None: self.kvstore = await kvstore_impl(self.config.kvstore) diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 53573e4aa..c27344f95 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -416,6 +416,7 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc self.cache: dict[str, VectorDBWithIndex] = {} self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 self.kvstore: KVStore | None = None async def initialize(self) -> None: diff --git a/llama_stack/providers/remote/vector_io/chroma/chroma.py b/llama_stack/providers/remote/vector_io/chroma/chroma.py index 6d85b9d16..b64d088b0 100644 --- a/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -167,6 +167,7 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.client = chromadb.PersistentClient(path=self.config.db_path) self.openai_vector_stores = await self._load_openai_vector_stores() self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 async def shutdown(self) -> None: pass diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index f4f7ad8e4..087b5804c 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -318,6 +318,7 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.vector_db_store = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 self.metadata_collection_name = "openai_vector_stores_metadata" async def initialize(self) -> None: diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index 58dbf3618..48fa3d842 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -354,6 +354,7 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco self.vector_db_store = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 self.metadata_collection_name = "openai_vector_stores_metadata" async def initialize(self) -> None: diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 142143128..e28a0342e 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -171,6 +171,7 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.kvstore: KVStore | None = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 self._qdrant_lock = asyncio.Lock() async def initialize(self) -> None: diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index a8c374190..8ff63fcd5 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -293,6 +293,7 @@ class WeaviateVectorIOAdapter( self.vector_db_store = None self.openai_vector_stores: dict[str, dict[str, Any]] = {} self.openai_file_batches: dict[str, dict[str, Any]] = {} + self._last_file_batch_cleanup_time = 0 self.metadata_collection_name = "openai_vector_stores_metadata" def _get_client(self) -> weaviate.WeaviateClient: diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index 97155f4df..9f23fc49f 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -12,6 +12,8 @@ import uuid from abc import ABC, abstractmethod from typing import Any +from pydantic import TypeAdapter + from llama_stack.apis.common.errors import VectorStoreNotFoundError from llama_stack.apis.files import Files, OpenAIFileObject from llama_stack.apis.vector_dbs import VectorDB @@ -50,6 +52,7 @@ logger = get_logger(name=__name__, category="providers::utils") # Constants for OpenAI vector stores CHUNK_MULTIPLIER = 5 +FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds VERSION = "v3" VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::" @@ -73,7 +76,7 @@ class OpenAIVectorStoreMixin(ABC): # KV store for persisting OpenAI vector store metadata kvstore: KVStore | None # Track last cleanup time to throttle cleanup operations - _last_cleanup_time: int + _last_file_batch_cleanup_time: int async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Save vector store metadata to persistent storage.""" @@ -215,16 +218,6 @@ class OpenAIVectorStoreMixin(ABC): if expired_count > 0: logger.info(f"Cleaned up {expired_count} expired file batches") - async def _cleanup_expired_file_batches_if_needed(self) -> None: - """Run cleanup if enough time has passed since the last cleanup.""" - current_time = int(time.time()) - cleanup_interval = 24 * 60 * 60 # 1 day in seconds - - if current_time - self._last_cleanup_time >= cleanup_interval: - logger.info("Running throttled cleanup of expired file batches") - await self._cleanup_expired_file_batches() - self._last_cleanup_time = current_time - async def _resume_incomplete_batches(self) -> None: """Resume processing of incomplete file batches after server restart.""" for batch_id, batch_info in self.openai_file_batches.items(): @@ -238,7 +231,7 @@ class OpenAIVectorStoreMixin(ABC): self.openai_vector_stores = await self._load_openai_vector_stores() self.openai_file_batches = await self._load_openai_vector_store_file_batches() await self._resume_incomplete_batches() - self._last_cleanup_time = 0 + self._last_file_batch_cleanup_time = 0 @abstractmethod async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: @@ -945,7 +938,11 @@ class OpenAIVectorStoreMixin(ABC): asyncio.create_task(self._process_file_batch_async(batch_id, batch_info)) # Run cleanup if needed (throttled to once every 1 day) - asyncio.create_task(self._cleanup_expired_file_batches_if_needed()) + current_time = int(time.time()) + if current_time - self._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS: + logger.info("Running throttled cleanup of expired file batches") + asyncio.create_task(self._cleanup_expired_file_batches()) + self._last_file_batch_cleanup_time = current_time return batch_object @@ -962,11 +959,10 @@ class OpenAIVectorStoreMixin(ABC): for file_id in file_ids: try: - chunking_strategy_obj = ( - VectorStoreChunkingStrategyStatic(**chunking_strategy) - if chunking_strategy.get("type") == "static" - else VectorStoreChunkingStrategyAuto(**chunking_strategy) + chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter( + VectorStoreChunkingStrategy ) + chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy) await self.openai_attach_file_to_vector_store( vector_store_id=vector_store_id, file_id=file_id,