From 9d2d8ab61c707ccce03d9b1e015b056c0a41dddc Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Wed, 1 Oct 2025 10:38:23 -0700 Subject: [PATCH] persist file batches and clean up after 7 days --- .../utils/memory/openai_vector_store_mixin.py | 173 ++++++--- .../vector_io/test_openai_vector_stores.py | 7 + .../test_vector_io_openai_vector_stores.py | 328 +++++++++++++++++- 3 files changed, 459 insertions(+), 49 deletions(-) diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index b77b2cbda..a24fb07f7 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -56,6 +56,7 @@ VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::" OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::" OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::" OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::" +OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX = f"openai_vector_stores_file_batches:{VERSION}::" class OpenAIVectorStoreMixin(ABC): @@ -160,9 +161,72 @@ class OpenAIVectorStoreMixin(ABC): for idx in range(len(raw_items)): await self.kvstore.delete(f"{contents_prefix}{idx}") + async def _save_openai_vector_store_file_batch(self, batch_id: str, batch_info: dict[str, Any]) -> None: + """Save file batch metadata to persistent storage.""" + assert self.kvstore + key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}" + await self.kvstore.set(key=key, value=json.dumps(batch_info)) + # update in-memory cache + self.openai_file_batches[batch_id] = batch_info + + async def _load_openai_vector_store_file_batches(self) -> dict[str, dict[str, Any]]: + """Load all file batch metadata from persistent storage.""" + assert self.kvstore + start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff" + stored_data = await self.kvstore.values_in_range(start_key, end_key) + + batches: dict[str, dict[str, Any]] = {} + for item in stored_data: + info = json.loads(item) + batches[info["id"]] = info + return batches + + async def _delete_openai_vector_store_file_batch(self, batch_id: str) -> None: + """Delete file batch metadata from persistent storage and in-memory cache.""" + assert self.kvstore + key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}" + await self.kvstore.delete(key) + # remove from in-memory cache + self.openai_file_batches.pop(batch_id, None) + + async def _cleanup_expired_file_batches(self) -> None: + """Clean up expired file batches from persistent storage.""" + assert self.kvstore + start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX + end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff" + stored_data = await self.kvstore.values_in_range(start_key, end_key) + + current_time = int(time.time()) + expired_count = 0 + + for item in stored_data: + info = json.loads(item) + expires_at = info.get("expires_at") + if expires_at and current_time > expires_at: + logger.info(f"Cleaning up expired file batch: {info['id']}") + await self.kvstore.delete(f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{info['id']}") + # Remove from in-memory cache if present + self.openai_file_batches.pop(info["id"], None) + expired_count += 1 + + if expired_count > 0: + logger.info(f"Cleaned up {expired_count} expired file batches") + + async def _resume_incomplete_batches(self) -> None: + """Resume processing of incomplete file batches after server restart.""" + for batch_id, batch_info in self.openai_file_batches.items(): + if batch_info["status"] == "in_progress": + logger.info(f"Resuming incomplete file batch: {batch_id}") + # Restart the background processing task + asyncio.create_task(self._process_file_batch_async(batch_id, batch_info)) + async def initialize_openai_vector_stores(self) -> None: - """Load existing OpenAI vector stores into the in-memory cache.""" + """Load existing OpenAI vector stores and file batches into the in-memory cache.""" self.openai_vector_stores = await self._load_openai_vector_stores() + self.openai_file_batches = await self._load_openai_vector_store_file_batches() + # Resume any incomplete file batches + await self._resume_incomplete_batches() @abstractmethod async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: @@ -835,6 +899,8 @@ class OpenAIVectorStoreMixin(ABC): created_at = int(time.time()) batch_id = f"batch_{uuid.uuid4()}" + # File batches expire after 7 days + expires_at = created_at + (7 * 24 * 60 * 60) # Initialize batch file counts - all files start as in_progress file_counts = VectorStoreFileCounts( @@ -854,60 +920,71 @@ class OpenAIVectorStoreMixin(ABC): file_counts=file_counts, ) - # Store batch object and file_ids in memory - self.openai_file_batches[batch_id] = { - "batch_object": batch_object, + batch_info = { + **batch_object.model_dump(), "file_ids": file_ids, + "attributes": attributes, + "chunking_strategy": chunking_strategy.model_dump(), + "expires_at": expires_at, } + await self._save_openai_vector_store_file_batch(batch_id, batch_info) # Start background processing of files - asyncio.create_task(self._process_file_batch_async(batch_id, file_ids, attributes, chunking_strategy)) + asyncio.create_task(self._process_file_batch_async(batch_id, batch_info)) + + # Start background cleanup of expired batches (non-blocking) + asyncio.create_task(self._cleanup_expired_file_batches()) return batch_object async def _process_file_batch_async( self, batch_id: str, - file_ids: list[str], - attributes: dict[str, Any] | None, - chunking_strategy: VectorStoreChunkingStrategy | None, + batch_info: dict[str, Any], ) -> None: """Process files in a batch asynchronously in the background.""" - batch_info = self.openai_file_batches[batch_id] - batch_object = batch_info["batch_object"] - vector_store_id = batch_object.vector_store_id + file_ids = batch_info["file_ids"] + attributes = batch_info["attributes"] + chunking_strategy = batch_info["chunking_strategy"] + vector_store_id = batch_info["vector_store_id"] + for file_id in file_ids: try: - # Process each file + chunking_strategy_obj = ( + VectorStoreChunkingStrategyStatic(**chunking_strategy) + if chunking_strategy.get("type") == "static" + else VectorStoreChunkingStrategyAuto(**chunking_strategy) + ) await self.openai_attach_file_to_vector_store( vector_store_id=vector_store_id, file_id=file_id, attributes=attributes, - chunking_strategy=chunking_strategy, + chunking_strategy=chunking_strategy_obj, ) # Update counts atomically - batch_object.file_counts.completed += 1 - batch_object.file_counts.in_progress -= 1 + batch_info["file_counts"]["completed"] += 1 + batch_info["file_counts"]["in_progress"] -= 1 except Exception as e: logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}") - batch_object.file_counts.failed += 1 - batch_object.file_counts.in_progress -= 1 + batch_info["file_counts"]["failed"] += 1 + batch_info["file_counts"]["in_progress"] -= 1 # Update final status when all files are processed - if batch_object.file_counts.failed == 0: - batch_object.status = "completed" - elif batch_object.file_counts.completed == 0: - batch_object.status = "failed" + if batch_info["file_counts"]["failed"] == 0: + batch_info["status"] = "completed" + elif batch_info["file_counts"]["completed"] == 0: + batch_info["status"] = "failed" else: - batch_object.status = "completed" # Partial success counts as completed + batch_info["status"] = "completed" # Partial success counts as completed - logger.info(f"File batch {batch_id} processing completed with status: {batch_object.status}") + # Save final batch status to persistent storage (keep completed batches like vector stores) + await self._save_openai_vector_store_file_batch(batch_id, batch_info) - def _get_and_validate_batch( - self, batch_id: str, vector_store_id: str - ) -> tuple[dict[str, Any], VectorStoreFileBatchObject]: + logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}") + + def _get_and_validate_batch(self, batch_id: str, vector_store_id: str) -> dict[str, Any]: """Get and validate batch exists and belongs to vector store.""" if vector_store_id not in self.openai_vector_stores: raise VectorStoreNotFoundError(vector_store_id) @@ -916,12 +993,18 @@ class OpenAIVectorStoreMixin(ABC): raise ValueError(f"File batch {batch_id} not found") batch_info = self.openai_file_batches[batch_id] - batch_object = batch_info["batch_object"] - if batch_object.vector_store_id != vector_store_id: + # Check if batch has expired (read-only check) + expires_at = batch_info.get("expires_at") + if expires_at: + current_time = int(time.time()) + if current_time > expires_at: + raise ValueError(f"File batch {batch_id} has expired after 7 days from creation") + + if batch_info["vector_store_id"] != vector_store_id: raise ValueError(f"File batch {batch_id} does not belong to vector store {vector_store_id}") - return batch_info, batch_object + return batch_info def _paginate_objects( self, @@ -965,8 +1048,9 @@ class OpenAIVectorStoreMixin(ABC): vector_store_id: str, ) -> VectorStoreFileBatchObject: """Retrieve a vector store file batch.""" - _, batch_object = self._get_and_validate_batch(batch_id, vector_store_id) - return batch_object + batch_info = self._get_and_validate_batch(batch_id, vector_store_id) + # Convert dict back to Pydantic model for API response + return VectorStoreFileBatchObject(**batch_info) async def openai_list_files_in_vector_store_file_batch( self, @@ -979,7 +1063,7 @@ class OpenAIVectorStoreMixin(ABC): order: str | None = "desc", ) -> VectorStoreFilesListInBatchResponse: """Returns a list of vector store files in a batch.""" - batch_info, _ = self._get_and_validate_batch(batch_id, vector_store_id) + batch_info = self._get_and_validate_batch(batch_id, vector_store_id) batch_file_ids = batch_info["file_ids"] # Load file objects for files in this batch @@ -1019,24 +1103,19 @@ class OpenAIVectorStoreMixin(ABC): vector_store_id: str, ) -> VectorStoreFileBatchObject: """Cancel a vector store file batch.""" - batch_info, batch_object = self._get_and_validate_batch(batch_id, vector_store_id) + batch_info = self._get_and_validate_batch(batch_id, vector_store_id) # Only allow cancellation if batch is in progress - if batch_object.status not in ["in_progress"]: - raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_object.status}") + if batch_info["status"] not in ["in_progress"]: + raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_info['status']}") - # Create updated batch object with cancelled status - updated_batch = VectorStoreFileBatchObject( - id=batch_object.id, - object=batch_object.object, - created_at=batch_object.created_at, - vector_store_id=batch_object.vector_store_id, - status="cancelled", - file_counts=batch_object.file_counts, - ) + # Update batch with cancelled status + batch_info["status"] = "cancelled" - # Update the stored batch info - batch_info["batch_object"] = updated_batch - self.openai_file_batches[batch_id] = batch_info + # Save cancelled batch status to persistent storage (keep cancelled batches like vector stores) + await self._save_openai_vector_store_file_batch(batch_id, batch_info) + + # Create updated batch object for API response + updated_batch = VectorStoreFileBatchObject(**batch_info) return updated_batch diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index c0615be58..8df8a6d2a 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -18,6 +18,13 @@ from llama_stack.log import get_logger logger = get_logger(name=__name__, category="vector_io") +@pytest.fixture(autouse=True) +def rate_limit_between_tests(): + """Add 10 second delay between integration tests to prevent rate limiting.""" + yield # Run the test first + time.sleep(10) # Delay after each test + + def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models): vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"] for p in vector_io_providers: diff --git a/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py b/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py index b044c3d52..118a7b0d2 100644 --- a/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py +++ b/tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py @@ -315,8 +315,9 @@ async def test_create_vector_store_file_batch(vector_io_adapter): "file_ids": [], } - # Mock attach method to avoid actual processing + # Mock attach method and batch processing to avoid actual processing vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() batch = await vector_io_adapter.openai_create_vector_store_file_batch( vector_store_id=store_id, @@ -375,7 +376,9 @@ async def test_cancel_vector_store_file_batch(vector_io_adapter): "file_ids": [], } + # Mock both file attachment and batch processing to prevent automatic completion vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() # Create batch batch = await vector_io_adapter.openai_create_vector_store_file_batch( @@ -633,7 +636,7 @@ async def test_cancel_completed_batch_fails(vector_io_adapter): # Manually update status to completed batch_info = vector_io_adapter.openai_file_batches[batch.id] - batch_info["batch_object"].status = "completed" + batch_info["status"] = "completed" # Try to cancel - should fail with pytest.raises(ValueError, match="Cannot cancel batch .* with status completed"): @@ -641,3 +644,324 @@ async def test_cancel_completed_batch_fails(vector_io_adapter): batch_id=batch.id, vector_store_id=store_id, ) + + +async def test_file_batch_persistence_across_restarts(vector_io_adapter): + """Test that in-progress file batches are persisted and resumed after restart.""" + store_id = "vs_1234" + file_ids = ["file_1", "file_2"] + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Mock attach method and batch processing to avoid actual processing + vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() + + # Create batch + batch = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, + file_ids=file_ids, + ) + batch_id = batch.id + + # Verify batch is saved to persistent storage + assert batch_id in vector_io_adapter.openai_file_batches + saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}" + saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key) + assert saved_batch is not None + + # Verify the saved batch data contains all necessary information + saved_data = json.loads(saved_batch) + assert saved_data["id"] == batch_id + assert saved_data["status"] == "in_progress" + assert saved_data["file_ids"] == file_ids + + # Simulate restart - clear in-memory cache and reload + vector_io_adapter.openai_file_batches.clear() + + # Temporarily restore the real initialize_openai_vector_stores method + from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin + + real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores + await real_method(vector_io_adapter) + + # Re-mock the processing method to prevent any resumed batches from processing + vector_io_adapter._process_file_batch_async = AsyncMock() + + # Verify batch was restored + assert batch_id in vector_io_adapter.openai_file_batches + restored_batch = vector_io_adapter.openai_file_batches[batch_id] + assert restored_batch["status"] == "in_progress" + assert restored_batch["id"] == batch_id + assert vector_io_adapter.openai_file_batches[batch_id]["file_ids"] == file_ids + + +async def test_completed_batch_cleanup_from_persistence(vector_io_adapter): + """Test that completed batches are removed from persistent storage.""" + store_id = "vs_1234" + file_ids = ["file_1"] + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Mock successful file processing + vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + + # Create batch + batch = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, + file_ids=file_ids, + ) + batch_id = batch.id + + # Verify batch is initially saved to persistent storage + saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}" + saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key) + assert saved_batch is not None + + # Simulate batch completion by calling the processing method + batch_info = vector_io_adapter.openai_file_batches[batch_id] + + # Mark as completed and process + batch_info["file_counts"]["completed"] = len(file_ids) + batch_info["file_counts"]["in_progress"] = 0 + batch_info["status"] = "completed" + + # Manually call the cleanup (this normally happens in _process_file_batch_async) + await vector_io_adapter._delete_openai_vector_store_file_batch(batch_id) + + # Verify batch was removed from persistent storage + cleaned_batch = await vector_io_adapter.kvstore.get(saved_batch_key) + assert cleaned_batch is None + + # Batch should be removed from memory as well (matches vector store pattern) + assert batch_id not in vector_io_adapter.openai_file_batches + + +async def test_cancelled_batch_persists_in_storage(vector_io_adapter): + """Test that cancelled batches persist in storage with updated status.""" + store_id = "vs_1234" + file_ids = ["file_1", "file_2"] + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Mock attach method and batch processing to avoid actual processing + vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() + + # Create batch + batch = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, + file_ids=file_ids, + ) + batch_id = batch.id + + # Verify batch is initially saved to persistent storage + saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}" + saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key) + assert saved_batch is not None + + # Cancel the batch + cancelled_batch = await vector_io_adapter.openai_cancel_vector_store_file_batch( + batch_id=batch_id, + vector_store_id=store_id, + ) + + # Verify batch status is cancelled + assert cancelled_batch.status == "cancelled" + + # Verify batch persists in storage with cancelled status + updated_batch = await vector_io_adapter.kvstore.get(saved_batch_key) + assert updated_batch is not None + batch_data = json.loads(updated_batch) + assert batch_data["status"] == "cancelled" + + # Batch should remain in memory cache (matches vector store pattern) + assert batch_id in vector_io_adapter.openai_file_batches + assert vector_io_adapter.openai_file_batches[batch_id]["status"] == "cancelled" + + +async def test_only_in_progress_batches_resumed(vector_io_adapter): + """Test that only in-progress batches are resumed for processing, but all batches are persisted.""" + store_id = "vs_1234" + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Mock attach method and batch processing to prevent automatic completion + vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() + + # Create multiple batches + batch1 = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, file_ids=["file_1"] + ) + batch2 = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, file_ids=["file_2"] + ) + + # Complete one batch (should persist with completed status) + batch1_info = vector_io_adapter.openai_file_batches[batch1.id] + batch1_info["status"] = "completed" + await vector_io_adapter._save_openai_vector_store_file_batch(batch1.id, batch1_info) + + # Cancel the other batch (should persist with cancelled status) + await vector_io_adapter.openai_cancel_vector_store_file_batch(batch_id=batch2.id, vector_store_id=store_id) + + # Create a third batch that stays in progress + batch3 = await vector_io_adapter.openai_create_vector_store_file_batch( + vector_store_id=store_id, file_ids=["file_3"] + ) + + # Simulate restart - first clear memory, then reload from persistence + vector_io_adapter.openai_file_batches.clear() + + # Mock the processing method BEFORE calling initialize to capture the resume calls + mock_process = AsyncMock() + vector_io_adapter._process_file_batch_async = mock_process + + # Temporarily restore the real initialize_openai_vector_stores method + from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin + + real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores + await real_method(vector_io_adapter) + + # All batches should be restored from persistence + assert batch1.id in vector_io_adapter.openai_file_batches # completed, persisted + assert batch2.id in vector_io_adapter.openai_file_batches # cancelled, persisted + assert batch3.id in vector_io_adapter.openai_file_batches # in-progress, restored + + # Check their statuses + assert vector_io_adapter.openai_file_batches[batch1.id]["status"] == "completed" + assert vector_io_adapter.openai_file_batches[batch2.id]["status"] == "cancelled" + assert vector_io_adapter.openai_file_batches[batch3.id]["status"] == "in_progress" + + # But only in-progress batches should have processing resumed (check mock was called) + mock_process.assert_called_once_with(batch3.id, vector_io_adapter.openai_file_batches[batch3.id]) + + +async def test_cleanup_expired_file_batches(vector_io_adapter): + """Test that expired file batches are cleaned up properly.""" + store_id = "vs_1234" + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Mock processing to prevent automatic completion + vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock() + vector_io_adapter._process_file_batch_async = AsyncMock() + + # Create batches with different ages + import time + + current_time = int(time.time()) + + # Create an old expired batch (10 days old) + old_batch_info = { + "id": "batch_old", + "vector_store_id": store_id, + "status": "completed", + "created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago + "expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago + "file_ids": ["file_1"], + } + + # Create a recent valid batch + new_batch_info = { + "id": "batch_new", + "vector_store_id": store_id, + "status": "completed", + "created_at": current_time - (1 * 24 * 60 * 60), # 1 day ago + "expires_at": current_time + (6 * 24 * 60 * 60), # Expires in 6 days + "file_ids": ["file_2"], + } + + # Store both batches in persistent storage + await vector_io_adapter._save_openai_vector_store_file_batch("batch_old", old_batch_info) + await vector_io_adapter._save_openai_vector_store_file_batch("batch_new", new_batch_info) + + # Add to in-memory cache + vector_io_adapter.openai_file_batches["batch_old"] = old_batch_info + vector_io_adapter.openai_file_batches["batch_new"] = new_batch_info + + # Verify both batches exist before cleanup + assert "batch_old" in vector_io_adapter.openai_file_batches + assert "batch_new" in vector_io_adapter.openai_file_batches + + # Run cleanup + await vector_io_adapter._cleanup_expired_file_batches() + + # Verify expired batch was removed from memory + assert "batch_old" not in vector_io_adapter.openai_file_batches + assert "batch_new" in vector_io_adapter.openai_file_batches + + # Verify expired batch was removed from storage + old_batch_key = "openai_vector_stores_file_batches:v3::batch_old" + new_batch_key = "openai_vector_stores_file_batches:v3::batch_new" + + old_stored = await vector_io_adapter.kvstore.get(old_batch_key) + new_stored = await vector_io_adapter.kvstore.get(new_batch_key) + + assert old_stored is None # Expired batch should be deleted + assert new_stored is not None # Valid batch should remain + + +async def test_expired_batch_access_error(vector_io_adapter): + """Test that accessing expired batches returns clear error message.""" + store_id = "vs_1234" + + # Setup vector store + vector_io_adapter.openai_vector_stores[store_id] = { + "id": store_id, + "name": "Test Store", + "files": {}, + "file_ids": [], + } + + # Create an expired batch + import time + + current_time = int(time.time()) + + expired_batch_info = { + "id": "batch_expired", + "vector_store_id": store_id, + "status": "completed", + "created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago + "expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago + "file_ids": ["file_1"], + } + + # Add to in-memory cache (simulating it was loaded before expiration) + vector_io_adapter.openai_file_batches["batch_expired"] = expired_batch_info + + # Try to access expired batch + with pytest.raises(ValueError, match="File batch batch_expired has expired after 7 days from creation"): + vector_io_adapter._get_and_validate_batch("batch_expired", store_id)