mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-03 19:57:35 +00:00
persist file batches and clean up after 7 days
This commit is contained in:
parent
943255697e
commit
9d2d8ab61c
3 changed files with 459 additions and 49 deletions
|
@ -56,6 +56,7 @@ VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
|
|||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX = f"openai_vector_stores_file_batches:{VERSION}::"
|
||||
|
||||
|
||||
class OpenAIVectorStoreMixin(ABC):
|
||||
|
@ -160,9 +161,72 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
for idx in range(len(raw_items)):
|
||||
await self.kvstore.delete(f"{contents_prefix}{idx}")
|
||||
|
||||
async def _save_openai_vector_store_file_batch(self, batch_id: str, batch_info: dict[str, Any]) -> None:
|
||||
"""Save file batch metadata to persistent storage."""
|
||||
assert self.kvstore
|
||||
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(batch_info))
|
||||
# update in-memory cache
|
||||
self.openai_file_batches[batch_id] = batch_info
|
||||
|
||||
async def _load_openai_vector_store_file_batches(self) -> dict[str, dict[str, Any]]:
|
||||
"""Load all file batch metadata from persistent storage."""
|
||||
assert self.kvstore
|
||||
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
|
||||
stored_data = await self.kvstore.values_in_range(start_key, end_key)
|
||||
|
||||
batches: dict[str, dict[str, Any]] = {}
|
||||
for item in stored_data:
|
||||
info = json.loads(item)
|
||||
batches[info["id"]] = info
|
||||
return batches
|
||||
|
||||
async def _delete_openai_vector_store_file_batch(self, batch_id: str) -> None:
|
||||
"""Delete file batch metadata from persistent storage and in-memory cache."""
|
||||
assert self.kvstore
|
||||
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
|
||||
await self.kvstore.delete(key)
|
||||
# remove from in-memory cache
|
||||
self.openai_file_batches.pop(batch_id, None)
|
||||
|
||||
async def _cleanup_expired_file_batches(self) -> None:
|
||||
"""Clean up expired file batches from persistent storage."""
|
||||
assert self.kvstore
|
||||
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
|
||||
stored_data = await self.kvstore.values_in_range(start_key, end_key)
|
||||
|
||||
current_time = int(time.time())
|
||||
expired_count = 0
|
||||
|
||||
for item in stored_data:
|
||||
info = json.loads(item)
|
||||
expires_at = info.get("expires_at")
|
||||
if expires_at and current_time > expires_at:
|
||||
logger.info(f"Cleaning up expired file batch: {info['id']}")
|
||||
await self.kvstore.delete(f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{info['id']}")
|
||||
# Remove from in-memory cache if present
|
||||
self.openai_file_batches.pop(info["id"], None)
|
||||
expired_count += 1
|
||||
|
||||
if expired_count > 0:
|
||||
logger.info(f"Cleaned up {expired_count} expired file batches")
|
||||
|
||||
async def _resume_incomplete_batches(self) -> None:
|
||||
"""Resume processing of incomplete file batches after server restart."""
|
||||
for batch_id, batch_info in self.openai_file_batches.items():
|
||||
if batch_info["status"] == "in_progress":
|
||||
logger.info(f"Resuming incomplete file batch: {batch_id}")
|
||||
# Restart the background processing task
|
||||
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
|
||||
|
||||
async def initialize_openai_vector_stores(self) -> None:
|
||||
"""Load existing OpenAI vector stores into the in-memory cache."""
|
||||
"""Load existing OpenAI vector stores and file batches into the in-memory cache."""
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
|
||||
# Resume any incomplete file batches
|
||||
await self._resume_incomplete_batches()
|
||||
|
||||
@abstractmethod
|
||||
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
|
||||
|
@ -835,6 +899,8 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
|
||||
created_at = int(time.time())
|
||||
batch_id = f"batch_{uuid.uuid4()}"
|
||||
# File batches expire after 7 days
|
||||
expires_at = created_at + (7 * 24 * 60 * 60)
|
||||
|
||||
# Initialize batch file counts - all files start as in_progress
|
||||
file_counts = VectorStoreFileCounts(
|
||||
|
@ -854,60 +920,71 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
file_counts=file_counts,
|
||||
)
|
||||
|
||||
# Store batch object and file_ids in memory
|
||||
self.openai_file_batches[batch_id] = {
|
||||
"batch_object": batch_object,
|
||||
batch_info = {
|
||||
**batch_object.model_dump(),
|
||||
"file_ids": file_ids,
|
||||
"attributes": attributes,
|
||||
"chunking_strategy": chunking_strategy.model_dump(),
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
# Start background processing of files
|
||||
asyncio.create_task(self._process_file_batch_async(batch_id, file_ids, attributes, chunking_strategy))
|
||||
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
|
||||
|
||||
# Start background cleanup of expired batches (non-blocking)
|
||||
asyncio.create_task(self._cleanup_expired_file_batches())
|
||||
|
||||
return batch_object
|
||||
|
||||
async def _process_file_batch_async(
|
||||
self,
|
||||
batch_id: str,
|
||||
file_ids: list[str],
|
||||
attributes: dict[str, Any] | None,
|
||||
chunking_strategy: VectorStoreChunkingStrategy | None,
|
||||
batch_info: dict[str, Any],
|
||||
) -> None:
|
||||
"""Process files in a batch asynchronously in the background."""
|
||||
batch_info = self.openai_file_batches[batch_id]
|
||||
batch_object = batch_info["batch_object"]
|
||||
vector_store_id = batch_object.vector_store_id
|
||||
file_ids = batch_info["file_ids"]
|
||||
attributes = batch_info["attributes"]
|
||||
chunking_strategy = batch_info["chunking_strategy"]
|
||||
vector_store_id = batch_info["vector_store_id"]
|
||||
|
||||
for file_id in file_ids:
|
||||
try:
|
||||
# Process each file
|
||||
chunking_strategy_obj = (
|
||||
VectorStoreChunkingStrategyStatic(**chunking_strategy)
|
||||
if chunking_strategy.get("type") == "static"
|
||||
else VectorStoreChunkingStrategyAuto(**chunking_strategy)
|
||||
)
|
||||
await self.openai_attach_file_to_vector_store(
|
||||
vector_store_id=vector_store_id,
|
||||
file_id=file_id,
|
||||
attributes=attributes,
|
||||
chunking_strategy=chunking_strategy,
|
||||
chunking_strategy=chunking_strategy_obj,
|
||||
)
|
||||
|
||||
# Update counts atomically
|
||||
batch_object.file_counts.completed += 1
|
||||
batch_object.file_counts.in_progress -= 1
|
||||
batch_info["file_counts"]["completed"] += 1
|
||||
batch_info["file_counts"]["in_progress"] -= 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}")
|
||||
batch_object.file_counts.failed += 1
|
||||
batch_object.file_counts.in_progress -= 1
|
||||
batch_info["file_counts"]["failed"] += 1
|
||||
batch_info["file_counts"]["in_progress"] -= 1
|
||||
|
||||
# Update final status when all files are processed
|
||||
if batch_object.file_counts.failed == 0:
|
||||
batch_object.status = "completed"
|
||||
elif batch_object.file_counts.completed == 0:
|
||||
batch_object.status = "failed"
|
||||
if batch_info["file_counts"]["failed"] == 0:
|
||||
batch_info["status"] = "completed"
|
||||
elif batch_info["file_counts"]["completed"] == 0:
|
||||
batch_info["status"] = "failed"
|
||||
else:
|
||||
batch_object.status = "completed" # Partial success counts as completed
|
||||
batch_info["status"] = "completed" # Partial success counts as completed
|
||||
|
||||
logger.info(f"File batch {batch_id} processing completed with status: {batch_object.status}")
|
||||
# Save final batch status to persistent storage (keep completed batches like vector stores)
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
def _get_and_validate_batch(
|
||||
self, batch_id: str, vector_store_id: str
|
||||
) -> tuple[dict[str, Any], VectorStoreFileBatchObject]:
|
||||
logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}")
|
||||
|
||||
def _get_and_validate_batch(self, batch_id: str, vector_store_id: str) -> dict[str, Any]:
|
||||
"""Get and validate batch exists and belongs to vector store."""
|
||||
if vector_store_id not in self.openai_vector_stores:
|
||||
raise VectorStoreNotFoundError(vector_store_id)
|
||||
|
@ -916,12 +993,18 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
raise ValueError(f"File batch {batch_id} not found")
|
||||
|
||||
batch_info = self.openai_file_batches[batch_id]
|
||||
batch_object = batch_info["batch_object"]
|
||||
|
||||
if batch_object.vector_store_id != vector_store_id:
|
||||
# Check if batch has expired (read-only check)
|
||||
expires_at = batch_info.get("expires_at")
|
||||
if expires_at:
|
||||
current_time = int(time.time())
|
||||
if current_time > expires_at:
|
||||
raise ValueError(f"File batch {batch_id} has expired after 7 days from creation")
|
||||
|
||||
if batch_info["vector_store_id"] != vector_store_id:
|
||||
raise ValueError(f"File batch {batch_id} does not belong to vector store {vector_store_id}")
|
||||
|
||||
return batch_info, batch_object
|
||||
return batch_info
|
||||
|
||||
def _paginate_objects(
|
||||
self,
|
||||
|
@ -965,8 +1048,9 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
vector_store_id: str,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Retrieve a vector store file batch."""
|
||||
_, batch_object = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
return batch_object
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
# Convert dict back to Pydantic model for API response
|
||||
return VectorStoreFileBatchObject(**batch_info)
|
||||
|
||||
async def openai_list_files_in_vector_store_file_batch(
|
||||
self,
|
||||
|
@ -979,7 +1063,7 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
order: str | None = "desc",
|
||||
) -> VectorStoreFilesListInBatchResponse:
|
||||
"""Returns a list of vector store files in a batch."""
|
||||
batch_info, _ = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
batch_file_ids = batch_info["file_ids"]
|
||||
|
||||
# Load file objects for files in this batch
|
||||
|
@ -1019,24 +1103,19 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
vector_store_id: str,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Cancel a vector store file batch."""
|
||||
batch_info, batch_object = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
|
||||
# Only allow cancellation if batch is in progress
|
||||
if batch_object.status not in ["in_progress"]:
|
||||
raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_object.status}")
|
||||
if batch_info["status"] not in ["in_progress"]:
|
||||
raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_info['status']}")
|
||||
|
||||
# Create updated batch object with cancelled status
|
||||
updated_batch = VectorStoreFileBatchObject(
|
||||
id=batch_object.id,
|
||||
object=batch_object.object,
|
||||
created_at=batch_object.created_at,
|
||||
vector_store_id=batch_object.vector_store_id,
|
||||
status="cancelled",
|
||||
file_counts=batch_object.file_counts,
|
||||
)
|
||||
# Update batch with cancelled status
|
||||
batch_info["status"] = "cancelled"
|
||||
|
||||
# Update the stored batch info
|
||||
batch_info["batch_object"] = updated_batch
|
||||
self.openai_file_batches[batch_id] = batch_info
|
||||
# Save cancelled batch status to persistent storage (keep cancelled batches like vector stores)
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
# Create updated batch object for API response
|
||||
updated_batch = VectorStoreFileBatchObject(**batch_info)
|
||||
|
||||
return updated_batch
|
||||
|
|
|
@ -18,6 +18,13 @@ from llama_stack.log import get_logger
|
|||
logger = get_logger(name=__name__, category="vector_io")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def rate_limit_between_tests():
|
||||
"""Add 10 second delay between integration tests to prevent rate limiting."""
|
||||
yield # Run the test first
|
||||
time.sleep(10) # Delay after each test
|
||||
|
||||
|
||||
def skip_if_provider_doesnt_support_openai_vector_stores(client_with_models):
|
||||
vector_io_providers = [p for p in client_with_models.providers.list() if p.api == "vector_io"]
|
||||
for p in vector_io_providers:
|
||||
|
|
|
@ -315,8 +315,9 @@ async def test_create_vector_store_file_batch(vector_io_adapter):
|
|||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method to avoid actual processing
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
|
@ -375,7 +376,9 @@ async def test_cancel_vector_store_file_batch(vector_io_adapter):
|
|||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock both file attachment and batch processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
|
@ -633,7 +636,7 @@ async def test_cancel_completed_batch_fails(vector_io_adapter):
|
|||
|
||||
# Manually update status to completed
|
||||
batch_info = vector_io_adapter.openai_file_batches[batch.id]
|
||||
batch_info["batch_object"].status = "completed"
|
||||
batch_info["status"] = "completed"
|
||||
|
||||
# Try to cancel - should fail
|
||||
with pytest.raises(ValueError, match="Cannot cancel batch .* with status completed"):
|
||||
|
@ -641,3 +644,324 @@ async def test_cancel_completed_batch_fails(vector_io_adapter):
|
|||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
|
||||
async def test_file_batch_persistence_across_restarts(vector_io_adapter):
|
||||
"""Test that in-progress file batches are persisted and resumed after restart."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
batch_id = batch.id
|
||||
|
||||
# Verify batch is saved to persistent storage
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}"
|
||||
saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert saved_batch is not None
|
||||
|
||||
# Verify the saved batch data contains all necessary information
|
||||
saved_data = json.loads(saved_batch)
|
||||
assert saved_data["id"] == batch_id
|
||||
assert saved_data["status"] == "in_progress"
|
||||
assert saved_data["file_ids"] == file_ids
|
||||
|
||||
# Simulate restart - clear in-memory cache and reload
|
||||
vector_io_adapter.openai_file_batches.clear()
|
||||
|
||||
# Temporarily restore the real initialize_openai_vector_stores method
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
|
||||
real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores
|
||||
await real_method(vector_io_adapter)
|
||||
|
||||
# Re-mock the processing method to prevent any resumed batches from processing
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Verify batch was restored
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
restored_batch = vector_io_adapter.openai_file_batches[batch_id]
|
||||
assert restored_batch["status"] == "in_progress"
|
||||
assert restored_batch["id"] == batch_id
|
||||
assert vector_io_adapter.openai_file_batches[batch_id]["file_ids"] == file_ids
|
||||
|
||||
|
||||
async def test_completed_batch_cleanup_from_persistence(vector_io_adapter):
|
||||
"""Test that completed batches are removed from persistent storage."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock successful file processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
batch_id = batch.id
|
||||
|
||||
# Verify batch is initially saved to persistent storage
|
||||
saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}"
|
||||
saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert saved_batch is not None
|
||||
|
||||
# Simulate batch completion by calling the processing method
|
||||
batch_info = vector_io_adapter.openai_file_batches[batch_id]
|
||||
|
||||
# Mark as completed and process
|
||||
batch_info["file_counts"]["completed"] = len(file_ids)
|
||||
batch_info["file_counts"]["in_progress"] = 0
|
||||
batch_info["status"] = "completed"
|
||||
|
||||
# Manually call the cleanup (this normally happens in _process_file_batch_async)
|
||||
await vector_io_adapter._delete_openai_vector_store_file_batch(batch_id)
|
||||
|
||||
# Verify batch was removed from persistent storage
|
||||
cleaned_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert cleaned_batch is None
|
||||
|
||||
# Batch should be removed from memory as well (matches vector store pattern)
|
||||
assert batch_id not in vector_io_adapter.openai_file_batches
|
||||
|
||||
|
||||
async def test_cancelled_batch_persists_in_storage(vector_io_adapter):
|
||||
"""Test that cancelled batches persist in storage with updated status."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
batch_id = batch.id
|
||||
|
||||
# Verify batch is initially saved to persistent storage
|
||||
saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}"
|
||||
saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert saved_batch is not None
|
||||
|
||||
# Cancel the batch
|
||||
cancelled_batch = await vector_io_adapter.openai_cancel_vector_store_file_batch(
|
||||
batch_id=batch_id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
# Verify batch status is cancelled
|
||||
assert cancelled_batch.status == "cancelled"
|
||||
|
||||
# Verify batch persists in storage with cancelled status
|
||||
updated_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert updated_batch is not None
|
||||
batch_data = json.loads(updated_batch)
|
||||
assert batch_data["status"] == "cancelled"
|
||||
|
||||
# Batch should remain in memory cache (matches vector store pattern)
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
assert vector_io_adapter.openai_file_batches[batch_id]["status"] == "cancelled"
|
||||
|
||||
|
||||
async def test_only_in_progress_batches_resumed(vector_io_adapter):
|
||||
"""Test that only in-progress batches are resumed for processing, but all batches are persisted."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create multiple batches
|
||||
batch1 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_1"]
|
||||
)
|
||||
batch2 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_2"]
|
||||
)
|
||||
|
||||
# Complete one batch (should persist with completed status)
|
||||
batch1_info = vector_io_adapter.openai_file_batches[batch1.id]
|
||||
batch1_info["status"] = "completed"
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch(batch1.id, batch1_info)
|
||||
|
||||
# Cancel the other batch (should persist with cancelled status)
|
||||
await vector_io_adapter.openai_cancel_vector_store_file_batch(batch_id=batch2.id, vector_store_id=store_id)
|
||||
|
||||
# Create a third batch that stays in progress
|
||||
batch3 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_3"]
|
||||
)
|
||||
|
||||
# Simulate restart - first clear memory, then reload from persistence
|
||||
vector_io_adapter.openai_file_batches.clear()
|
||||
|
||||
# Mock the processing method BEFORE calling initialize to capture the resume calls
|
||||
mock_process = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = mock_process
|
||||
|
||||
# Temporarily restore the real initialize_openai_vector_stores method
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
|
||||
real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores
|
||||
await real_method(vector_io_adapter)
|
||||
|
||||
# All batches should be restored from persistence
|
||||
assert batch1.id in vector_io_adapter.openai_file_batches # completed, persisted
|
||||
assert batch2.id in vector_io_adapter.openai_file_batches # cancelled, persisted
|
||||
assert batch3.id in vector_io_adapter.openai_file_batches # in-progress, restored
|
||||
|
||||
# Check their statuses
|
||||
assert vector_io_adapter.openai_file_batches[batch1.id]["status"] == "completed"
|
||||
assert vector_io_adapter.openai_file_batches[batch2.id]["status"] == "cancelled"
|
||||
assert vector_io_adapter.openai_file_batches[batch3.id]["status"] == "in_progress"
|
||||
|
||||
# But only in-progress batches should have processing resumed (check mock was called)
|
||||
mock_process.assert_called_once_with(batch3.id, vector_io_adapter.openai_file_batches[batch3.id])
|
||||
|
||||
|
||||
async def test_cleanup_expired_file_batches(vector_io_adapter):
|
||||
"""Test that expired file batches are cleaned up properly."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batches with different ages
|
||||
import time
|
||||
|
||||
current_time = int(time.time())
|
||||
|
||||
# Create an old expired batch (10 days old)
|
||||
old_batch_info = {
|
||||
"id": "batch_old",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago
|
||||
"expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago
|
||||
"file_ids": ["file_1"],
|
||||
}
|
||||
|
||||
# Create a recent valid batch
|
||||
new_batch_info = {
|
||||
"id": "batch_new",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (1 * 24 * 60 * 60), # 1 day ago
|
||||
"expires_at": current_time + (6 * 24 * 60 * 60), # Expires in 6 days
|
||||
"file_ids": ["file_2"],
|
||||
}
|
||||
|
||||
# Store both batches in persistent storage
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch("batch_old", old_batch_info)
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch("batch_new", new_batch_info)
|
||||
|
||||
# Add to in-memory cache
|
||||
vector_io_adapter.openai_file_batches["batch_old"] = old_batch_info
|
||||
vector_io_adapter.openai_file_batches["batch_new"] = new_batch_info
|
||||
|
||||
# Verify both batches exist before cleanup
|
||||
assert "batch_old" in vector_io_adapter.openai_file_batches
|
||||
assert "batch_new" in vector_io_adapter.openai_file_batches
|
||||
|
||||
# Run cleanup
|
||||
await vector_io_adapter._cleanup_expired_file_batches()
|
||||
|
||||
# Verify expired batch was removed from memory
|
||||
assert "batch_old" not in vector_io_adapter.openai_file_batches
|
||||
assert "batch_new" in vector_io_adapter.openai_file_batches
|
||||
|
||||
# Verify expired batch was removed from storage
|
||||
old_batch_key = "openai_vector_stores_file_batches:v3::batch_old"
|
||||
new_batch_key = "openai_vector_stores_file_batches:v3::batch_new"
|
||||
|
||||
old_stored = await vector_io_adapter.kvstore.get(old_batch_key)
|
||||
new_stored = await vector_io_adapter.kvstore.get(new_batch_key)
|
||||
|
||||
assert old_stored is None # Expired batch should be deleted
|
||||
assert new_stored is not None # Valid batch should remain
|
||||
|
||||
|
||||
async def test_expired_batch_access_error(vector_io_adapter):
|
||||
"""Test that accessing expired batches returns clear error message."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Create an expired batch
|
||||
import time
|
||||
|
||||
current_time = int(time.time())
|
||||
|
||||
expired_batch_info = {
|
||||
"id": "batch_expired",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago
|
||||
"expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago
|
||||
"file_ids": ["file_1"],
|
||||
}
|
||||
|
||||
# Add to in-memory cache (simulating it was loaded before expiration)
|
||||
vector_io_adapter.openai_file_batches["batch_expired"] = expired_batch_info
|
||||
|
||||
# Try to access expired batch
|
||||
with pytest.raises(ValueError, match="File batch batch_expired has expired after 7 days from creation"):
|
||||
vector_io_adapter._get_and_validate_batch("batch_expired", store_id)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue