mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-16 07:09:25 +00:00
Merge branch 'llamastack:main' into model_unregisteration_error_message
This commit is contained in:
commit
1180626a22
103 changed files with 11265 additions and 704 deletions
|
|
@ -8,6 +8,7 @@
|
|||
import pytest
|
||||
|
||||
from llama_stack.apis.agents.openai_responses import (
|
||||
OpenAIResponseAnnotationFileCitation,
|
||||
OpenAIResponseInputFunctionToolCallOutput,
|
||||
OpenAIResponseInputMessageContentImage,
|
||||
OpenAIResponseInputMessageContentText,
|
||||
|
|
@ -35,6 +36,7 @@ from llama_stack.apis.inference import (
|
|||
OpenAIUserMessageParam,
|
||||
)
|
||||
from llama_stack.providers.inline.agents.meta_reference.responses.utils import (
|
||||
_extract_citations_from_text,
|
||||
convert_chat_choice_to_response_message,
|
||||
convert_response_content_to_chat_content,
|
||||
convert_response_input_to_chat_messages,
|
||||
|
|
@ -340,3 +342,26 @@ class TestIsFunctionToolCall:
|
|||
|
||||
result = is_function_tool_call(tool_call, tools)
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestExtractCitationsFromText:
|
||||
def test_extract_citations_and_annotations(self):
|
||||
text = "Start [not-a-file]. New source <|file-abc123|>. "
|
||||
text += "Other source <|file-def456|>? Repeat source <|file-abc123|>! No citation."
|
||||
file_mapping = {"file-abc123": "doc1.pdf", "file-def456": "doc2.txt"}
|
||||
|
||||
annotations, cleaned_text = _extract_citations_from_text(text, file_mapping)
|
||||
|
||||
expected_annotations = [
|
||||
OpenAIResponseAnnotationFileCitation(file_id="file-abc123", filename="doc1.pdf", index=30),
|
||||
OpenAIResponseAnnotationFileCitation(file_id="file-def456", filename="doc2.txt", index=44),
|
||||
OpenAIResponseAnnotationFileCitation(file_id="file-abc123", filename="doc1.pdf", index=59),
|
||||
]
|
||||
expected_clean_text = "Start [not-a-file]. New source. Other source? Repeat source! No citation."
|
||||
|
||||
assert cleaned_text == expected_clean_text
|
||||
assert annotations == expected_annotations
|
||||
# OpenAI cites at the end of the sentence
|
||||
assert cleaned_text[expected_annotations[0].index] == "."
|
||||
assert cleaned_text[expected_annotations[1].index] == "?"
|
||||
assert cleaned_text[expected_annotations[2].index] == "!"
|
||||
|
|
|
|||
|
|
@ -186,43 +186,3 @@ async def test_openai_chat_completion_is_async(vllm_inference_adapter):
|
|||
|
||||
assert mock_create_client.call_count == 4 # no cheating
|
||||
assert total_time < (sleep_time * 2), f"Total time taken: {total_time}s exceeded expected max"
|
||||
|
||||
|
||||
async def test_should_refresh_models():
|
||||
"""
|
||||
Test the should_refresh_models method with different refresh_models configurations.
|
||||
|
||||
This test verifies that:
|
||||
1. When refresh_models is True, should_refresh_models returns True regardless of api_token
|
||||
2. When refresh_models is False, should_refresh_models returns False regardless of api_token
|
||||
"""
|
||||
|
||||
# Test case 1: refresh_models is True, api_token is None
|
||||
config1 = VLLMInferenceAdapterConfig(url="http://test.localhost", api_token=None, refresh_models=True)
|
||||
adapter1 = VLLMInferenceAdapter(config=config1)
|
||||
result1 = await adapter1.should_refresh_models()
|
||||
assert result1 is True, "should_refresh_models should return True when refresh_models is True"
|
||||
|
||||
# Test case 2: refresh_models is True, api_token is empty string
|
||||
config2 = VLLMInferenceAdapterConfig(url="http://test.localhost", api_token="", refresh_models=True)
|
||||
adapter2 = VLLMInferenceAdapter(config=config2)
|
||||
result2 = await adapter2.should_refresh_models()
|
||||
assert result2 is True, "should_refresh_models should return True when refresh_models is True"
|
||||
|
||||
# Test case 3: refresh_models is True, api_token is "fake" (default)
|
||||
config3 = VLLMInferenceAdapterConfig(url="http://test.localhost", api_token="fake", refresh_models=True)
|
||||
adapter3 = VLLMInferenceAdapter(config=config3)
|
||||
result3 = await adapter3.should_refresh_models()
|
||||
assert result3 is True, "should_refresh_models should return True when refresh_models is True"
|
||||
|
||||
# Test case 4: refresh_models is True, api_token is real token
|
||||
config4 = VLLMInferenceAdapterConfig(url="http://test.localhost", api_token="real-token-123", refresh_models=True)
|
||||
adapter4 = VLLMInferenceAdapter(config=config4)
|
||||
result4 = await adapter4.should_refresh_models()
|
||||
assert result4 is True, "should_refresh_models should return True when refresh_models is True"
|
||||
|
||||
# Test case 5: refresh_models is False, api_token is real token
|
||||
config5 = VLLMInferenceAdapterConfig(url="http://test.localhost", api_token="real-token-456", refresh_models=False)
|
||||
adapter5 = VLLMInferenceAdapter(config=config5)
|
||||
result5 = await adapter5.should_refresh_models()
|
||||
assert result5 is False, "should_refresh_models should return False when refresh_models is False"
|
||||
|
|
|
|||
|
|
@ -466,10 +466,16 @@ class TestOpenAIMixinModelRegistration:
|
|||
assert result is None
|
||||
|
||||
async def test_should_refresh_models(self, mixin):
|
||||
"""Test should_refresh_models method (should always return False)"""
|
||||
"""Test should_refresh_models method returns config value"""
|
||||
# Default config has refresh_models=False
|
||||
result = await mixin.should_refresh_models()
|
||||
assert result is False
|
||||
|
||||
config_with_refresh = RemoteInferenceProviderConfig(refresh_models=True)
|
||||
mixin_with_refresh = OpenAIMixinImpl(config=config_with_refresh)
|
||||
result_with_refresh = await mixin_with_refresh.should_refresh_models()
|
||||
assert result_with_refresh is True
|
||||
|
||||
async def test_register_model_error_propagation(self, mixin, mock_client_with_exception, mock_client_context):
|
||||
"""Test that errors from provider API are properly propagated during registration"""
|
||||
model = Model(
|
||||
|
|
|
|||
|
|
@ -6,16 +6,22 @@
|
|||
|
||||
import json
|
||||
import time
|
||||
from unittest.mock import AsyncMock
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import Chunk, QueryChunksResponse
|
||||
from llama_stack.apis.vector_io import (
|
||||
Chunk,
|
||||
QueryChunksResponse,
|
||||
VectorStoreChunkingStrategyAuto,
|
||||
VectorStoreFileObject,
|
||||
)
|
||||
from llama_stack.providers.remote.vector_io.milvus.milvus import VECTOR_DBS_PREFIX
|
||||
|
||||
# This test is a unit test for the inline VectoerIO providers. This should only contain
|
||||
# This test is a unit test for the inline VectorIO providers. This should only contain
|
||||
# tests which are specific to this class. More general (API-level) tests should be placed in
|
||||
# tests/integration/vector_io/
|
||||
#
|
||||
|
|
@ -25,6 +31,16 @@ from llama_stack.providers.remote.vector_io.milvus.milvus import VECTOR_DBS_PREF
|
|||
# -v -s --tb=short --disable-warnings --asyncio-mode=auto
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_resume_file_batches(request):
|
||||
"""Mock the resume functionality to prevent stale file batches from being processed during tests."""
|
||||
with patch(
|
||||
"llama_stack.providers.utils.memory.openai_vector_store_mixin.OpenAIVectorStoreMixin._resume_incomplete_batches",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
async def test_initialize_index(vector_index):
|
||||
await vector_index.initialize()
|
||||
|
||||
|
|
@ -294,3 +310,668 @@ async def test_delete_openai_vector_store_file_from_storage(vector_io_adapter, t
|
|||
assert loaded_file_info == {}
|
||||
loaded_contents = await vector_io_adapter._load_openai_vector_store_file_contents(store_id, file_id)
|
||||
assert loaded_contents == []
|
||||
|
||||
|
||||
async def test_create_vector_store_file_batch(vector_io_adapter):
|
||||
"""Test creating a file batch."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2", "file_3"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
assert batch.vector_store_id == store_id
|
||||
assert batch.status == "in_progress"
|
||||
assert batch.file_counts.total == len(file_ids)
|
||||
assert batch.file_counts.in_progress == len(file_ids)
|
||||
assert batch.id in vector_io_adapter.openai_file_batches
|
||||
|
||||
|
||||
async def test_retrieve_vector_store_file_batch(vector_io_adapter):
|
||||
"""Test retrieving a file batch."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch first
|
||||
created_batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Retrieve batch
|
||||
retrieved_batch = await vector_io_adapter.openai_retrieve_vector_store_file_batch(
|
||||
batch_id=created_batch.id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
assert retrieved_batch.id == created_batch.id
|
||||
assert retrieved_batch.vector_store_id == store_id
|
||||
assert retrieved_batch.status == "in_progress"
|
||||
|
||||
|
||||
async def test_cancel_vector_store_file_batch(vector_io_adapter):
|
||||
"""Test cancelling a file batch."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock both file attachment and batch processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Cancel batch
|
||||
cancelled_batch = await vector_io_adapter.openai_cancel_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
assert cancelled_batch.status == "cancelled"
|
||||
|
||||
|
||||
async def test_list_files_in_vector_store_file_batch(vector_io_adapter):
|
||||
"""Test listing files in a batch."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store with files
|
||||
files = {}
|
||||
for i, file_id in enumerate(file_ids):
|
||||
files[file_id] = VectorStoreFileObject(
|
||||
id=file_id,
|
||||
object="vector_store.file",
|
||||
usage_bytes=1000,
|
||||
created_at=int(time.time()) + i,
|
||||
vector_store_id=store_id,
|
||||
status="completed",
|
||||
chunking_strategy=VectorStoreChunkingStrategyAuto(),
|
||||
)
|
||||
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": files,
|
||||
"file_ids": file_ids,
|
||||
}
|
||||
|
||||
# Mock file loading
|
||||
vector_io_adapter._load_openai_vector_store_file = AsyncMock(
|
||||
side_effect=lambda vs_id, f_id: files[f_id].model_dump()
|
||||
)
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# List files
|
||||
response = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
assert len(response.data) == len(file_ids)
|
||||
assert response.first_id is not None
|
||||
assert response.last_id is not None
|
||||
|
||||
|
||||
async def test_file_batch_validation_errors(vector_io_adapter):
|
||||
"""Test file batch validation errors."""
|
||||
# Test nonexistent vector store
|
||||
with pytest.raises(VectorStoreNotFoundError):
|
||||
await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id="nonexistent",
|
||||
file_ids=["file_1"],
|
||||
)
|
||||
|
||||
# Setup store for remaining tests
|
||||
store_id = "vs_test"
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {"id": store_id, "files": {}, "file_ids": []}
|
||||
|
||||
# Test nonexistent batch
|
||||
with pytest.raises(ValueError, match="File batch .* not found"):
|
||||
await vector_io_adapter.openai_retrieve_vector_store_file_batch(
|
||||
batch_id="nonexistent_batch",
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
# Test wrong vector store for batch
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=["file_1"],
|
||||
)
|
||||
|
||||
# Create wrong_store so it exists but the batch doesn't belong to it
|
||||
wrong_store_id = "wrong_store"
|
||||
vector_io_adapter.openai_vector_stores[wrong_store_id] = {"id": wrong_store_id, "files": {}, "file_ids": []}
|
||||
|
||||
with pytest.raises(ValueError, match="does not belong to vector store"):
|
||||
await vector_io_adapter.openai_retrieve_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=wrong_store_id,
|
||||
)
|
||||
|
||||
|
||||
async def test_file_batch_pagination(vector_io_adapter):
|
||||
"""Test file batch pagination."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2", "file_3", "file_4", "file_5"]
|
||||
|
||||
# Setup vector store with multiple files
|
||||
files = {}
|
||||
for i, file_id in enumerate(file_ids):
|
||||
files[file_id] = VectorStoreFileObject(
|
||||
id=file_id,
|
||||
object="vector_store.file",
|
||||
usage_bytes=1000,
|
||||
created_at=int(time.time()) + i,
|
||||
vector_store_id=store_id,
|
||||
status="completed",
|
||||
chunking_strategy=VectorStoreChunkingStrategyAuto(),
|
||||
)
|
||||
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": files,
|
||||
"file_ids": file_ids,
|
||||
}
|
||||
|
||||
# Mock file loading
|
||||
vector_io_adapter._load_openai_vector_store_file = AsyncMock(
|
||||
side_effect=lambda vs_id, f_id: files[f_id].model_dump()
|
||||
)
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Test pagination with limit
|
||||
response = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
limit=3,
|
||||
)
|
||||
|
||||
assert len(response.data) == 3
|
||||
assert response.has_more is True
|
||||
|
||||
# Test pagination with after cursor
|
||||
first_page = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
limit=2,
|
||||
)
|
||||
|
||||
second_page = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
limit=2,
|
||||
after=first_page.last_id,
|
||||
)
|
||||
|
||||
assert len(first_page.data) == 2
|
||||
assert len(second_page.data) == 2
|
||||
# Ensure no overlap between pages
|
||||
first_page_ids = {file_obj.id for file_obj in first_page.data}
|
||||
second_page_ids = {file_obj.id for file_obj in second_page.data}
|
||||
assert first_page_ids.isdisjoint(second_page_ids)
|
||||
# Verify we got all expected files across both pages (in desc order: file_5, file_4, file_3, file_2, file_1)
|
||||
all_returned_ids = first_page_ids | second_page_ids
|
||||
assert all_returned_ids == {"file_2", "file_3", "file_4", "file_5"}
|
||||
|
||||
|
||||
async def test_file_batch_status_filtering(vector_io_adapter):
|
||||
"""Test file batch status filtering."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2", "file_3"]
|
||||
|
||||
# Setup vector store with files having different statuses
|
||||
files = {}
|
||||
statuses = ["completed", "in_progress", "completed"]
|
||||
for i, (file_id, status) in enumerate(zip(file_ids, statuses, strict=False)):
|
||||
files[file_id] = VectorStoreFileObject(
|
||||
id=file_id,
|
||||
object="vector_store.file",
|
||||
usage_bytes=1000,
|
||||
created_at=int(time.time()) + i,
|
||||
vector_store_id=store_id,
|
||||
status=status,
|
||||
chunking_strategy=VectorStoreChunkingStrategyAuto(),
|
||||
)
|
||||
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": files,
|
||||
"file_ids": file_ids,
|
||||
}
|
||||
|
||||
# Mock file loading
|
||||
vector_io_adapter._load_openai_vector_store_file = AsyncMock(
|
||||
side_effect=lambda vs_id, f_id: files[f_id].model_dump()
|
||||
)
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Test filtering by completed status
|
||||
response = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
filter="completed",
|
||||
)
|
||||
|
||||
assert len(response.data) == 2 # Only 2 completed files
|
||||
for file_obj in response.data:
|
||||
assert file_obj.status == "completed"
|
||||
|
||||
# Test filtering by in_progress status
|
||||
response = await vector_io_adapter.openai_list_files_in_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
filter="in_progress",
|
||||
)
|
||||
|
||||
assert len(response.data) == 1 # Only 1 in_progress file
|
||||
assert response.data[0].status == "in_progress"
|
||||
|
||||
|
||||
async def test_cancel_completed_batch_fails(vector_io_adapter):
|
||||
"""Test that cancelling completed batch fails."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Manually update status to completed
|
||||
batch_info = vector_io_adapter.openai_file_batches[batch.id]
|
||||
batch_info["status"] = "completed"
|
||||
|
||||
# Try to cancel - should fail
|
||||
with pytest.raises(ValueError, match="Cannot cancel batch .* with status completed"):
|
||||
await vector_io_adapter.openai_cancel_vector_store_file_batch(
|
||||
batch_id=batch.id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
|
||||
async def test_file_batch_persistence_across_restarts(vector_io_adapter):
|
||||
"""Test that in-progress file batches are persisted and resumed after restart."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
batch_id = batch.id
|
||||
|
||||
# Verify batch is saved to persistent storage
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}"
|
||||
saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert saved_batch is not None
|
||||
|
||||
# Verify the saved batch data contains all necessary information
|
||||
saved_data = json.loads(saved_batch)
|
||||
assert saved_data["id"] == batch_id
|
||||
assert saved_data["status"] == "in_progress"
|
||||
assert saved_data["file_ids"] == file_ids
|
||||
|
||||
# Simulate restart - clear in-memory cache and reload from persistence
|
||||
vector_io_adapter.openai_file_batches.clear()
|
||||
|
||||
# Temporarily restore the real initialize_openai_vector_stores method
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
|
||||
real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores
|
||||
await real_method(vector_io_adapter)
|
||||
|
||||
# Re-mock the processing method to prevent any resumed batches from processing
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Verify batch was restored
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
restored_batch = vector_io_adapter.openai_file_batches[batch_id]
|
||||
assert restored_batch["status"] == "in_progress"
|
||||
assert restored_batch["id"] == batch_id
|
||||
assert vector_io_adapter.openai_file_batches[batch_id]["file_ids"] == file_ids
|
||||
|
||||
|
||||
async def test_cancelled_batch_persists_in_storage(vector_io_adapter):
|
||||
"""Test that cancelled batches persist in storage with updated status."""
|
||||
store_id = "vs_1234"
|
||||
file_ids = ["file_1", "file_2"]
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to avoid actual processing
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batch
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
batch_id = batch.id
|
||||
|
||||
# Verify batch is initially saved to persistent storage
|
||||
saved_batch_key = f"openai_vector_stores_file_batches:v3::{batch_id}"
|
||||
saved_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert saved_batch is not None
|
||||
|
||||
# Cancel the batch
|
||||
cancelled_batch = await vector_io_adapter.openai_cancel_vector_store_file_batch(
|
||||
batch_id=batch_id,
|
||||
vector_store_id=store_id,
|
||||
)
|
||||
|
||||
# Verify batch status is cancelled
|
||||
assert cancelled_batch.status == "cancelled"
|
||||
|
||||
# Verify batch persists in storage with cancelled status
|
||||
updated_batch = await vector_io_adapter.kvstore.get(saved_batch_key)
|
||||
assert updated_batch is not None
|
||||
batch_data = json.loads(updated_batch)
|
||||
assert batch_data["status"] == "cancelled"
|
||||
|
||||
# Batch should remain in memory cache (matches vector store pattern)
|
||||
assert batch_id in vector_io_adapter.openai_file_batches
|
||||
assert vector_io_adapter.openai_file_batches[batch_id]["status"] == "cancelled"
|
||||
|
||||
|
||||
async def test_only_in_progress_batches_resumed(vector_io_adapter):
|
||||
"""Test that only in-progress batches are resumed for processing, but all batches are persisted."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock attach method and batch processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create multiple batches
|
||||
batch1 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_1"]
|
||||
)
|
||||
batch2 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_2"]
|
||||
)
|
||||
|
||||
# Complete one batch (should persist with completed status)
|
||||
batch1_info = vector_io_adapter.openai_file_batches[batch1.id]
|
||||
batch1_info["status"] = "completed"
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch(batch1.id, batch1_info)
|
||||
|
||||
# Cancel the other batch (should persist with cancelled status)
|
||||
await vector_io_adapter.openai_cancel_vector_store_file_batch(batch_id=batch2.id, vector_store_id=store_id)
|
||||
|
||||
# Create a third batch that stays in progress
|
||||
batch3 = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id, file_ids=["file_3"]
|
||||
)
|
||||
|
||||
# Simulate restart - clear memory and reload from persistence
|
||||
vector_io_adapter.openai_file_batches.clear()
|
||||
|
||||
# Temporarily restore the real initialize_openai_vector_stores method
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
|
||||
real_method = OpenAIVectorStoreMixin.initialize_openai_vector_stores
|
||||
await real_method(vector_io_adapter)
|
||||
|
||||
# All batches should be restored from persistence
|
||||
assert batch1.id in vector_io_adapter.openai_file_batches # completed, persisted
|
||||
assert batch2.id in vector_io_adapter.openai_file_batches # cancelled, persisted
|
||||
assert batch3.id in vector_io_adapter.openai_file_batches # in-progress, restored
|
||||
|
||||
# Check their statuses
|
||||
assert vector_io_adapter.openai_file_batches[batch1.id]["status"] == "completed"
|
||||
assert vector_io_adapter.openai_file_batches[batch2.id]["status"] == "cancelled"
|
||||
assert vector_io_adapter.openai_file_batches[batch3.id]["status"] == "in_progress"
|
||||
|
||||
# Resume functionality is mocked, so we're only testing persistence
|
||||
|
||||
|
||||
async def test_cleanup_expired_file_batches(vector_io_adapter):
|
||||
"""Test that expired file batches are cleaned up properly."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Mock processing to prevent automatic completion
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = AsyncMock()
|
||||
vector_io_adapter._process_file_batch_async = AsyncMock()
|
||||
|
||||
# Create batches with different ages
|
||||
import time
|
||||
|
||||
current_time = int(time.time())
|
||||
|
||||
# Create an old expired batch (10 days old)
|
||||
old_batch_info = {
|
||||
"id": "batch_old",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago
|
||||
"expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago
|
||||
"file_ids": ["file_1"],
|
||||
}
|
||||
|
||||
# Create a recent valid batch
|
||||
new_batch_info = {
|
||||
"id": "batch_new",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (1 * 24 * 60 * 60), # 1 day ago
|
||||
"expires_at": current_time + (6 * 24 * 60 * 60), # Expires in 6 days
|
||||
"file_ids": ["file_2"],
|
||||
}
|
||||
|
||||
# Store both batches in persistent storage
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch("batch_old", old_batch_info)
|
||||
await vector_io_adapter._save_openai_vector_store_file_batch("batch_new", new_batch_info)
|
||||
|
||||
# Add to in-memory cache
|
||||
vector_io_adapter.openai_file_batches["batch_old"] = old_batch_info
|
||||
vector_io_adapter.openai_file_batches["batch_new"] = new_batch_info
|
||||
|
||||
# Verify both batches exist before cleanup
|
||||
assert "batch_old" in vector_io_adapter.openai_file_batches
|
||||
assert "batch_new" in vector_io_adapter.openai_file_batches
|
||||
|
||||
# Run cleanup
|
||||
await vector_io_adapter._cleanup_expired_file_batches()
|
||||
|
||||
# Verify expired batch was removed from memory
|
||||
assert "batch_old" not in vector_io_adapter.openai_file_batches
|
||||
assert "batch_new" in vector_io_adapter.openai_file_batches
|
||||
|
||||
# Verify expired batch was removed from storage
|
||||
old_batch_key = "openai_vector_stores_file_batches:v3::batch_old"
|
||||
new_batch_key = "openai_vector_stores_file_batches:v3::batch_new"
|
||||
|
||||
old_stored = await vector_io_adapter.kvstore.get(old_batch_key)
|
||||
new_stored = await vector_io_adapter.kvstore.get(new_batch_key)
|
||||
|
||||
assert old_stored is None # Expired batch should be deleted
|
||||
assert new_stored is not None # Valid batch should remain
|
||||
|
||||
|
||||
async def test_expired_batch_access_error(vector_io_adapter):
|
||||
"""Test that accessing expired batches returns clear error message."""
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
# Create an expired batch
|
||||
import time
|
||||
|
||||
current_time = int(time.time())
|
||||
|
||||
expired_batch_info = {
|
||||
"id": "batch_expired",
|
||||
"vector_store_id": store_id,
|
||||
"status": "completed",
|
||||
"created_at": current_time - (10 * 24 * 60 * 60), # 10 days ago
|
||||
"expires_at": current_time - (3 * 24 * 60 * 60), # Expired 3 days ago
|
||||
"file_ids": ["file_1"],
|
||||
}
|
||||
|
||||
# Add to in-memory cache (simulating it was loaded before expiration)
|
||||
vector_io_adapter.openai_file_batches["batch_expired"] = expired_batch_info
|
||||
|
||||
# Try to access expired batch
|
||||
with pytest.raises(ValueError, match="File batch batch_expired has expired after 7 days from creation"):
|
||||
vector_io_adapter._get_and_validate_batch("batch_expired", store_id)
|
||||
|
||||
|
||||
async def test_max_concurrent_files_per_batch(vector_io_adapter):
|
||||
"""Test that file batch processing respects MAX_CONCURRENT_FILES_PER_BATCH limit."""
|
||||
import asyncio
|
||||
|
||||
store_id = "vs_1234"
|
||||
|
||||
# Setup vector store
|
||||
vector_io_adapter.openai_vector_stores[store_id] = {
|
||||
"id": store_id,
|
||||
"name": "Test Store",
|
||||
"files": {},
|
||||
"file_ids": [],
|
||||
}
|
||||
|
||||
active_files = 0
|
||||
|
||||
async def mock_attach_file_with_delay(vector_store_id: str, file_id: str, **kwargs):
|
||||
"""Mock that tracks concurrency and blocks indefinitely to test concurrency limit."""
|
||||
nonlocal active_files
|
||||
active_files += 1
|
||||
|
||||
# Block indefinitely to test concurrency limit
|
||||
await asyncio.sleep(float("inf"))
|
||||
|
||||
# Replace the attachment method
|
||||
vector_io_adapter.openai_attach_file_to_vector_store = mock_attach_file_with_delay
|
||||
|
||||
# Create a batch with more files than the concurrency limit
|
||||
file_ids = [f"file_{i}" for i in range(8)] # 8 files, but limit should be 5
|
||||
|
||||
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
|
||||
vector_store_id=store_id,
|
||||
file_ids=file_ids,
|
||||
)
|
||||
|
||||
# Give time for the semaphore logic to start processing files
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# Verify that only MAX_CONCURRENT_FILES_PER_BATCH files are processing concurrently
|
||||
# The semaphore in _process_files_with_concurrency should limit this
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import MAX_CONCURRENT_FILES_PER_BATCH
|
||||
|
||||
assert active_files == MAX_CONCURRENT_FILES_PER_BATCH, (
|
||||
f"Expected {MAX_CONCURRENT_FILES_PER_BATCH} active files, got {active_files}"
|
||||
)
|
||||
|
||||
# Verify batch is in progress
|
||||
assert batch.status == "in_progress"
|
||||
assert batch.file_counts.total == 8
|
||||
assert batch.file_counts.in_progress == 8
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue