mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-22 16:23:08 +00:00
**This PR changes configurations in a backward incompatible way.** Run configs today repeat full SQLite/Postgres snippets everywhere a store is needed, which means duplicated credentials, extra connection pools, and lots of drift between files. This PR introduces named storage backends so the stack and providers can share a single catalog and reference those backends by name. ## Key Changes - Add `storage.backends` to `StackRunConfig`, register each KV/SQL backend once at startup, and validate that references point to the right family. - Move server stores under `storage.stores` with lightweight references (backend + namespace/table) instead of full configs. - Update every provider/config/doc to use the new reference style; docs/codegen now surface the simplified YAML. ## Migration Before: ```yaml metadata_store: type: sqlite db_path: ~/.llama/distributions/foo/registry.db inference_store: type: postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} conversations_store: type: postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} ``` After: ```yaml storage: backends: kv_default: type: kv_sqlite db_path: ~/.llama/distributions/foo/kvstore.db sql_default: type: sql_postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} stores: metadata: backend: kv_default namespace: registry inference: backend: sql_default table_name: inference_store max_write_queue_size: 10000 num_writers: 4 conversations: backend: sql_default table_name: openai_conversations ``` Provider configs follow the same pattern—for example, a Chroma vector adapter switches from: ```yaml providers: vector_io: - provider_id: chromadb provider_type: remote::chromadb config: url: ${env.CHROMADB_URL} kvstore: type: sqlite db_path: ~/.llama/distributions/foo/chroma.db ``` to: ```yaml providers: vector_io: - provider_id: chromadb provider_type: remote::chromadb config: url: ${env.CHROMADB_URL} persistence: backend: kv_default namespace: vector_io::chroma_remote ``` Once the backends are declared, everything else just points at them, so rotating credentials or swapping to Postgres happens in one place and the stack reuses a single connection pool.
403 lines
17 KiB
Python
403 lines
17 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import time
|
|
from tempfile import TemporaryDirectory
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from llama_stack.apis.agents import Order
|
|
from llama_stack.apis.agents.openai_responses import (
|
|
OpenAIResponseInput,
|
|
OpenAIResponseObject,
|
|
)
|
|
from llama_stack.apis.inference import OpenAIMessageParam, OpenAIUserMessageParam
|
|
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqliteSqlStoreConfig
|
|
from llama_stack.providers.utils.responses.responses_store import ResponsesStore
|
|
from llama_stack.providers.utils.sqlstore.sqlstore import register_sqlstore_backends
|
|
|
|
|
|
def build_store(db_path: str, policy: list | None = None) -> ResponsesStore:
|
|
backend_name = f"sql_responses_{uuid4().hex}"
|
|
register_sqlstore_backends({backend_name: SqliteSqlStoreConfig(db_path=db_path)})
|
|
return ResponsesStore(
|
|
ResponsesStoreReference(backend=backend_name, table_name="responses"),
|
|
policy=policy or [],
|
|
)
|
|
|
|
|
|
def create_test_response_object(
|
|
response_id: str, created_timestamp: int, model: str = "test-model"
|
|
) -> OpenAIResponseObject:
|
|
"""Helper to create a test response object."""
|
|
return OpenAIResponseObject(
|
|
id=response_id,
|
|
created_at=created_timestamp,
|
|
model=model,
|
|
object="response",
|
|
output=[], # Required field
|
|
status="completed", # Required field
|
|
)
|
|
|
|
|
|
def create_test_response_input(content: str, input_id: str) -> OpenAIResponseInput:
|
|
"""Helper to create a test response input."""
|
|
from llama_stack.apis.agents.openai_responses import OpenAIResponseMessage
|
|
|
|
return OpenAIResponseMessage(
|
|
id=input_id,
|
|
content=content,
|
|
role="user",
|
|
type="message",
|
|
)
|
|
|
|
|
|
def create_test_messages(content: str) -> list[OpenAIMessageParam]:
|
|
"""Helper to create test messages for chat completion."""
|
|
return [OpenAIUserMessageParam(content=content)]
|
|
|
|
|
|
async def test_responses_store_pagination_basic():
|
|
"""Test basic pagination functionality for responses store."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Create test data with different timestamps
|
|
base_time = int(time.time())
|
|
test_data = [
|
|
("zebra-resp", base_time + 1),
|
|
("apple-resp", base_time + 2),
|
|
("moon-resp", base_time + 3),
|
|
("banana-resp", base_time + 4),
|
|
("car-resp", base_time + 5),
|
|
]
|
|
|
|
# Store test responses
|
|
for response_id, timestamp in test_data:
|
|
response = create_test_response_object(response_id, timestamp)
|
|
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
|
messages = create_test_messages(f"Input for {response_id}")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Test 1: First page with limit=2, descending order (default)
|
|
result = await store.list_responses(limit=2, order=Order.desc)
|
|
assert len(result.data) == 2
|
|
assert result.data[0].id == "car-resp" # Most recent first
|
|
assert result.data[1].id == "banana-resp"
|
|
assert result.has_more is True
|
|
assert result.last_id == "banana-resp"
|
|
|
|
# Test 2: Second page using 'after' parameter
|
|
result2 = await store.list_responses(after="banana-resp", limit=2, order=Order.desc)
|
|
assert len(result2.data) == 2
|
|
assert result2.data[0].id == "moon-resp"
|
|
assert result2.data[1].id == "apple-resp"
|
|
assert result2.has_more is True
|
|
|
|
# Test 3: Final page
|
|
result3 = await store.list_responses(after="apple-resp", limit=2, order=Order.desc)
|
|
assert len(result3.data) == 1
|
|
assert result3.data[0].id == "zebra-resp"
|
|
assert result3.has_more is False
|
|
|
|
|
|
async def test_responses_store_pagination_ascending():
|
|
"""Test pagination with ascending order."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Create test data
|
|
base_time = int(time.time())
|
|
test_data = [
|
|
("delta-resp", base_time + 1),
|
|
("charlie-resp", base_time + 2),
|
|
("alpha-resp", base_time + 3),
|
|
]
|
|
|
|
# Store test responses
|
|
for response_id, timestamp in test_data:
|
|
response = create_test_response_object(response_id, timestamp)
|
|
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
|
messages = create_test_messages(f"Input for {response_id}")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Test ascending order pagination
|
|
result = await store.list_responses(limit=1, order=Order.asc)
|
|
assert len(result.data) == 1
|
|
assert result.data[0].id == "delta-resp" # Oldest first
|
|
assert result.has_more is True
|
|
|
|
# Second page with ascending order
|
|
result2 = await store.list_responses(after="delta-resp", limit=1, order=Order.asc)
|
|
assert len(result2.data) == 1
|
|
assert result2.data[0].id == "charlie-resp"
|
|
assert result2.has_more is True
|
|
|
|
|
|
async def test_responses_store_pagination_with_model_filter():
|
|
"""Test pagination combined with model filtering."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Create test data with different models
|
|
base_time = int(time.time())
|
|
test_data = [
|
|
("xyz-resp", base_time + 1, "model-a"),
|
|
("def-resp", base_time + 2, "model-b"),
|
|
("pqr-resp", base_time + 3, "model-a"),
|
|
("abc-resp", base_time + 4, "model-b"),
|
|
]
|
|
|
|
# Store test responses
|
|
for response_id, timestamp, model in test_data:
|
|
response = create_test_response_object(response_id, timestamp, model)
|
|
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
|
messages = create_test_messages(f"Input for {response_id}")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Test pagination with model filter
|
|
result = await store.list_responses(limit=1, model="model-a", order=Order.desc)
|
|
assert len(result.data) == 1
|
|
assert result.data[0].id == "pqr-resp" # Most recent model-a
|
|
assert result.data[0].model == "model-a"
|
|
assert result.has_more is True
|
|
|
|
# Second page with model filter
|
|
result2 = await store.list_responses(after="pqr-resp", limit=1, model="model-a", order=Order.desc)
|
|
assert len(result2.data) == 1
|
|
assert result2.data[0].id == "xyz-resp"
|
|
assert result2.data[0].model == "model-a"
|
|
assert result2.has_more is False
|
|
|
|
|
|
async def test_responses_store_pagination_invalid_after():
|
|
"""Test error handling for invalid 'after' parameter."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Try to paginate with non-existent ID
|
|
with pytest.raises(ValueError, match="Record with id.*'non-existent' not found in table 'openai_responses'"):
|
|
await store.list_responses(after="non-existent", limit=2)
|
|
|
|
|
|
async def test_responses_store_pagination_no_limit():
|
|
"""Test pagination behavior when no limit is specified."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Create test data
|
|
base_time = int(time.time())
|
|
test_data = [
|
|
("omega-resp", base_time + 1),
|
|
("beta-resp", base_time + 2),
|
|
]
|
|
|
|
# Store test responses
|
|
for response_id, timestamp in test_data:
|
|
response = create_test_response_object(response_id, timestamp)
|
|
input_list = [create_test_response_input(f"Input for {response_id}", f"input-{response_id}")]
|
|
messages = create_test_messages(f"Input for {response_id}")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Test without limit (should use default of 50)
|
|
result = await store.list_responses(order=Order.desc)
|
|
assert len(result.data) == 2
|
|
assert result.data[0].id == "beta-resp" # Most recent first
|
|
assert result.data[1].id == "omega-resp"
|
|
assert result.has_more is False
|
|
|
|
|
|
async def test_responses_store_get_response_object():
|
|
"""Test retrieving a single response object."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Store a test response
|
|
response = create_test_response_object("test-resp", int(time.time()))
|
|
input_list = [create_test_response_input("Test input content", "input-test-resp")]
|
|
messages = create_test_messages("Test input content")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Retrieve the response
|
|
retrieved = await store.get_response_object("test-resp")
|
|
assert retrieved.id == "test-resp"
|
|
assert retrieved.model == "test-model"
|
|
assert len(retrieved.input) == 1
|
|
assert retrieved.input[0].content == "Test input content"
|
|
|
|
# Test error for non-existent response
|
|
with pytest.raises(ValueError, match="Response with id non-existent not found"):
|
|
await store.get_response_object("non-existent")
|
|
|
|
|
|
async def test_responses_store_input_items_pagination():
|
|
"""Test pagination functionality for input items."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Store a test response with many inputs with explicit IDs
|
|
response = create_test_response_object("test-resp", int(time.time()))
|
|
input_list = [
|
|
create_test_response_input("First input", "input-1"),
|
|
create_test_response_input("Second input", "input-2"),
|
|
create_test_response_input("Third input", "input-3"),
|
|
create_test_response_input("Fourth input", "input-4"),
|
|
create_test_response_input("Fifth input", "input-5"),
|
|
]
|
|
messages = create_test_messages("First input")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Verify all items are stored correctly with explicit IDs
|
|
all_items = await store.list_response_input_items("test-resp", order=Order.desc)
|
|
assert len(all_items.data) == 5
|
|
|
|
# In desc order: [Fifth, Fourth, Third, Second, First]
|
|
assert all_items.data[0].content == "Fifth input"
|
|
assert all_items.data[0].id == "input-5"
|
|
assert all_items.data[1].content == "Fourth input"
|
|
assert all_items.data[1].id == "input-4"
|
|
assert all_items.data[2].content == "Third input"
|
|
assert all_items.data[2].id == "input-3"
|
|
assert all_items.data[3].content == "Second input"
|
|
assert all_items.data[3].id == "input-2"
|
|
assert all_items.data[4].content == "First input"
|
|
assert all_items.data[4].id == "input-1"
|
|
|
|
# Test basic pagination with after parameter using actual IDs
|
|
result = await store.list_response_input_items("test-resp", limit=2, order=Order.desc)
|
|
assert len(result.data) == 2
|
|
assert result.data[0].content == "Fifth input" # Most recent first (reversed order)
|
|
assert result.data[1].content == "Fourth input"
|
|
|
|
# Test pagination using after with actual ID
|
|
result2 = await store.list_response_input_items("test-resp", after="input-5", limit=2, order=Order.desc)
|
|
assert len(result2.data) == 2
|
|
assert result2.data[0].content == "Fourth input" # Next item after Fifth
|
|
assert result2.data[1].content == "Third input"
|
|
|
|
# Test final page
|
|
result3 = await store.list_response_input_items("test-resp", after="input-3", limit=2, order=Order.desc)
|
|
assert len(result3.data) == 2
|
|
assert result3.data[0].content == "Second input"
|
|
assert result3.data[1].content == "First input"
|
|
|
|
# Test ascending order pagination
|
|
result_asc = await store.list_response_input_items("test-resp", limit=2, order=Order.asc)
|
|
assert len(result_asc.data) == 2
|
|
assert result_asc.data[0].content == "First input" # Oldest first
|
|
assert result_asc.data[1].content == "Second input"
|
|
|
|
# Test pagination with ascending order
|
|
result_asc2 = await store.list_response_input_items("test-resp", after="input-1", limit=2, order=Order.asc)
|
|
assert len(result_asc2.data) == 2
|
|
assert result_asc2.data[0].content == "Second input"
|
|
assert result_asc2.data[1].content == "Third input"
|
|
|
|
# Test error for non-existent after ID
|
|
with pytest.raises(ValueError, match="Input item with id 'non-existent' not found for response 'test-resp'"):
|
|
await store.list_response_input_items("test-resp", after="non-existent")
|
|
|
|
# Test error for unsupported features
|
|
with pytest.raises(NotImplementedError, match="Include is not supported yet"):
|
|
await store.list_response_input_items("test-resp", include=["some-field"])
|
|
|
|
# Test error for mutually exclusive parameters
|
|
with pytest.raises(ValueError, match="Cannot specify both 'before' and 'after' parameters"):
|
|
await store.list_response_input_items("test-resp", before="some-id", after="other-id")
|
|
|
|
|
|
async def test_responses_store_input_items_before_pagination():
|
|
"""Test before pagination functionality for input items."""
|
|
with TemporaryDirectory() as tmp_dir:
|
|
db_path = tmp_dir + "/test.db"
|
|
store = build_store(db_path)
|
|
await store.initialize()
|
|
|
|
# Store a test response with many inputs with explicit IDs
|
|
response = create_test_response_object("test-resp-before", int(time.time()))
|
|
input_list = [
|
|
create_test_response_input("First input", "before-1"),
|
|
create_test_response_input("Second input", "before-2"),
|
|
create_test_response_input("Third input", "before-3"),
|
|
create_test_response_input("Fourth input", "before-4"),
|
|
create_test_response_input("Fifth input", "before-5"),
|
|
]
|
|
messages = create_test_messages("First input")
|
|
await store.store_response_object(response, input_list, messages)
|
|
|
|
# Wait for all queued writes to complete
|
|
await store.flush()
|
|
|
|
# Test before pagination with descending order
|
|
# In desc order: [Fifth, Fourth, Third, Second, First]
|
|
# before="before-3" should return [Fifth, Fourth]
|
|
result = await store.list_response_input_items("test-resp-before", before="before-3", order=Order.desc)
|
|
assert len(result.data) == 2
|
|
assert result.data[0].content == "Fifth input"
|
|
assert result.data[1].content == "Fourth input"
|
|
|
|
# Test before pagination with limit
|
|
result2 = await store.list_response_input_items(
|
|
"test-resp-before", before="before-2", limit=3, order=Order.desc
|
|
)
|
|
assert len(result2.data) == 3
|
|
assert result2.data[0].content == "Fifth input"
|
|
assert result2.data[1].content == "Fourth input"
|
|
assert result2.data[2].content == "Third input"
|
|
|
|
# Test before pagination with ascending order
|
|
# In asc order: [First, Second, Third, Fourth, Fifth]
|
|
# before="before-4" should return [First, Second, Third]
|
|
result3 = await store.list_response_input_items("test-resp-before", before="before-4", order=Order.asc)
|
|
assert len(result3.data) == 3
|
|
assert result3.data[0].content == "First input"
|
|
assert result3.data[1].content == "Second input"
|
|
assert result3.data[2].content == "Third input"
|
|
|
|
# Test before with limit in ascending order
|
|
result4 = await store.list_response_input_items("test-resp-before", before="before-5", limit=2, order=Order.asc)
|
|
assert len(result4.data) == 2
|
|
assert result4.data[0].content == "First input"
|
|
assert result4.data[1].content == "Second input"
|
|
|
|
# Test error for non-existent before ID
|
|
with pytest.raises(
|
|
ValueError, match="Input item with id 'non-existent' not found for response 'test-resp-before'"
|
|
):
|
|
await store.list_response_input_items("test-resp-before", before="non-existent")
|