mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 04:04:14 +00:00
chore: introduce write queue for response_store (#3497)
# What does this PR do? Mirroring the same changes that was used for inference_store: https://github.com/llamastack/llama-stack/pull/3383 Will follow up with a shared internal API for managing these write queues. ## Test Plan existing tests
This commit is contained in:
parent
7c466a7ec5
commit
8ab6684a94
4 changed files with 136 additions and 7 deletions
|
@ -42,10 +42,12 @@ from llama_stack.apis.inference import (
|
|||
)
|
||||
from llama_stack.apis.tools.tools import Tool, ToolGroups, ToolInvocationResult, ToolParameter, ToolRuntime
|
||||
from llama_stack.core.access_control.access_control import default_policy
|
||||
from llama_stack.core.datatypes import ResponsesStoreConfig
|
||||
from llama_stack.providers.inline.agents.meta_reference.responses.openai_responses import (
|
||||
OpenAIResponsesImpl,
|
||||
)
|
||||
from llama_stack.providers.utils.responses.responses_store import ResponsesStore
|
||||
from llama_stack.providers.utils.sqlstore.sqlstore import SqliteSqlStoreConfig
|
||||
from tests.unit.providers.agents.meta_reference.fixtures import load_chat_completion_fixture
|
||||
|
||||
|
||||
|
@ -677,7 +679,9 @@ async def test_responses_store_list_input_items_logic():
|
|||
|
||||
# Create mock store and response store
|
||||
mock_sql_store = AsyncMock()
|
||||
responses_store = ResponsesStore(sql_store_config=None, policy=default_policy())
|
||||
responses_store = ResponsesStore(
|
||||
ResponsesStoreConfig(sql_store_config=SqliteSqlStoreConfig(db_path="mock_db_path")), policy=default_policy()
|
||||
)
|
||||
responses_store.sql_store = mock_sql_store
|
||||
|
||||
# Setup test data - multiple input items
|
||||
|
|
|
@ -67,6 +67,9 @@ async def test_responses_store_pagination_basic():
|
|||
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Test 1: First page with limit=2, descending order (default)
|
||||
result = await store.list_responses(limit=2, order=Order.desc)
|
||||
assert len(result.data) == 2
|
||||
|
@ -110,6 +113,9 @@ async def test_responses_store_pagination_ascending():
|
|||
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Test ascending order pagination
|
||||
result = await store.list_responses(limit=1, order=Order.asc)
|
||||
assert len(result.data) == 1
|
||||
|
@ -145,6 +151,9 @@ async def test_responses_store_pagination_with_model_filter():
|
|||
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Test pagination with model filter
|
||||
result = await store.list_responses(limit=1, model="model-a", order=Order.desc)
|
||||
assert len(result.data) == 1
|
||||
|
@ -192,6 +201,9 @@ async def test_responses_store_pagination_no_limit():
|
|||
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Test without limit (should use default of 50)
|
||||
result = await store.list_responses(order=Order.desc)
|
||||
assert len(result.data) == 2
|
||||
|
@ -212,6 +224,9 @@ async def test_responses_store_get_response_object():
|
|||
input_list = [create_test_response_input("Test input content", "input-test-resp")]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Retrieve the response
|
||||
retrieved = await store.get_response_object("test-resp")
|
||||
assert retrieved.id == "test-resp"
|
||||
|
@ -242,6 +257,9 @@ async def test_responses_store_input_items_pagination():
|
|||
]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Verify all items are stored correctly with explicit IDs
|
||||
all_items = await store.list_response_input_items("test-resp", order=Order.desc)
|
||||
assert len(all_items.data) == 5
|
||||
|
@ -319,6 +337,9 @@ async def test_responses_store_input_items_before_pagination():
|
|||
]
|
||||
await store.store_response_object(response, input_list)
|
||||
|
||||
# Wait for all queued writes to complete
|
||||
await store.flush()
|
||||
|
||||
# Test before pagination with descending order
|
||||
# In desc order: [Fifth, Fourth, Third, Second, First]
|
||||
# before="before-3" should return [Fifth, Fourth]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue