fix: harden storage semantics (#4118)

Fixes issues in the storage system by guaranteeing immediate durability
for responses and ensuring background writers stay alive. Three related
fixes:

* Responses to the OpenAI-compatible API now write directly to
Postgres/SQLite inside the request instead of detouring through an async
queue that might never drain; this restores the expected
read-after-write behavior and removes the "response not found" races
reported by users.

* The access-control shim was stamping owner_principal/access_attributes
as SQL NULL, which Postgres interprets as non-public rows; fixing it to
use the empty-string/JSON-null pattern means conversations and responses
stored without an authenticated user stay queryable (matching SQLite).

* The inference-store queue remains for batching, but its worker tasks
now start lazily on the live event loop so server startup doesn't cancel
them—writes keep flowing even when the stack is launched via llama stack
run.

Closes #4115 

### Test Plan

Added a matrix entry to test our "base" suite against Postgres as the
store.
This commit is contained in:
Ashwin Bharambe 2025-11-12 10:35:39 -08:00 committed by GitHub
parent 356f37b1ba
commit 492f79ca9b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 516 additions and 211 deletions

View file

@ -39,6 +39,32 @@ runs:
if: ${{ inputs.setup == 'vllm' && inputs.inference-mode == 'record' }} if: ${{ inputs.setup == 'vllm' && inputs.inference-mode == 'record' }}
uses: ./.github/actions/setup-vllm uses: ./.github/actions/setup-vllm
- name: Start Postgres service
if: ${{ contains(inputs.setup, 'postgres') }}
shell: bash
run: |
sudo docker rm -f postgres-ci || true
sudo docker run -d --name postgres-ci \
-e POSTGRES_USER=llamastack \
-e POSTGRES_PASSWORD=llamastack \
-e POSTGRES_DB=llamastack \
-p 5432:5432 \
postgres:16
echo "Waiting for Postgres to become ready..."
for i in {1..30}; do
if sudo docker exec postgres-ci pg_isready -U llamastack -d llamastack >/dev/null 2>&1; then
echo "Postgres is ready"
break
fi
if [ "$i" -eq 30 ]; then
echo "Postgres failed to start in time"
sudo docker logs postgres-ci || true
exit 1
fi
sleep 2
done
- name: Build Llama Stack - name: Build Llama Stack
shell: bash shell: bash
run: | run: |

View file

@ -66,12 +66,12 @@ jobs:
run-replay-mode-tests: run-replay-mode-tests:
needs: generate-matrix needs: generate-matrix
runs-on: ubuntu-latest runs-on: ubuntu-latest
name: ${{ format('Integration Tests ({0}, {1}, {2}, client={3}, {4})', matrix.client-type, matrix.config.setup, matrix.python-version, matrix.client-version, matrix.config.suite) }} name: ${{ format('Integration Tests ({0}, {1}, {2}, client={3}, {4})', matrix.client, matrix.config.setup, matrix.python-version, matrix.client-version, matrix.config.suite) }}
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
client-type: [library, docker, server] client: [library, docker, server]
# Use Python 3.13 only on nightly schedule (daily latest client test), otherwise use 3.12 # Use Python 3.13 only on nightly schedule (daily latest client test), otherwise use 3.12
python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }}
client-version: ${{ (github.event.schedule == '0 0 * * *' || github.event.inputs.test-all-client-versions == 'true') && fromJSON('["published", "latest"]') || fromJSON('["latest"]') }} client-version: ${{ (github.event.schedule == '0 0 * * *' || github.event.inputs.test-all-client-versions == 'true') && fromJSON('["published", "latest"]') || fromJSON('["latest"]') }}
@ -84,6 +84,7 @@ jobs:
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Setup test environment - name: Setup test environment
if: ${{ matrix.config.allowed_clients == null || contains(matrix.config.allowed_clients, matrix.client) }}
uses: ./.github/actions/setup-test-environment uses: ./.github/actions/setup-test-environment
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
@ -93,11 +94,16 @@ jobs:
inference-mode: 'replay' inference-mode: 'replay'
- name: Run tests - name: Run tests
if: ${{ matrix.config.allowed_clients == null || contains(matrix.config.allowed_clients, matrix.client) }}
uses: ./.github/actions/run-and-record-tests uses: ./.github/actions/run-and-record-tests
env: env:
OPENAI_API_KEY: dummy OPENAI_API_KEY: dummy
with: with:
stack-config: ${{ matrix.client-type == 'library' && 'ci-tests' || matrix.client-type == 'server' && 'server:ci-tests' || 'docker:ci-tests' }} stack-config: >-
${{ matrix.config.stack_config
|| (matrix.client == 'library' && 'ci-tests')
|| (matrix.client == 'server' && 'server:ci-tests')
|| 'docker:ci-tests' }}
setup: ${{ matrix.config.setup }} setup: ${{ matrix.config.setup }}
inference-mode: 'replay' inference-mode: 'replay'
suite: ${{ matrix.config.suite }} suite: ${{ matrix.config.suite }}

View file

@ -13,6 +13,5 @@ from ..starter.starter import get_distribution_template as get_starter_distribut
def get_distribution_template() -> DistributionTemplate: def get_distribution_template() -> DistributionTemplate:
template = get_starter_distribution_template(name="ci-tests") template = get_starter_distribution_template(name="ci-tests")
template.description = "CI tests for Llama Stack" template.description = "CI tests for Llama Stack"
template.run_configs.pop("run-with-postgres-store.yaml", None)
return template return template

View file

@ -0,0 +1,293 @@
version: 2
image_name: ci-tests
apis:
- agents
- batches
- datasetio
- eval
- files
- inference
- post_training
- safety
- scoring
- tool_runtime
- vector_io
providers:
inference:
- provider_id: ${env.CEREBRAS_API_KEY:+cerebras}
provider_type: remote::cerebras
config:
base_url: https://api.cerebras.ai
api_key: ${env.CEREBRAS_API_KEY:=}
- provider_id: ${env.OLLAMA_URL:+ollama}
provider_type: remote::ollama
config:
url: ${env.OLLAMA_URL:=http://localhost:11434}
- provider_id: ${env.VLLM_URL:+vllm}
provider_type: remote::vllm
config:
url: ${env.VLLM_URL:=}
max_tokens: ${env.VLLM_MAX_TOKENS:=4096}
api_token: ${env.VLLM_API_TOKEN:=fake}
tls_verify: ${env.VLLM_TLS_VERIFY:=true}
- provider_id: ${env.TGI_URL:+tgi}
provider_type: remote::tgi
config:
url: ${env.TGI_URL:=}
- provider_id: fireworks
provider_type: remote::fireworks
config:
url: https://api.fireworks.ai/inference/v1
api_key: ${env.FIREWORKS_API_KEY:=}
- provider_id: together
provider_type: remote::together
config:
url: https://api.together.xyz/v1
api_key: ${env.TOGETHER_API_KEY:=}
- provider_id: bedrock
provider_type: remote::bedrock
config:
api_key: ${env.AWS_BEDROCK_API_KEY:=}
region_name: ${env.AWS_DEFAULT_REGION:=us-east-2}
- provider_id: ${env.NVIDIA_API_KEY:+nvidia}
provider_type: remote::nvidia
config:
url: ${env.NVIDIA_BASE_URL:=https://integrate.api.nvidia.com}
api_key: ${env.NVIDIA_API_KEY:=}
append_api_version: ${env.NVIDIA_APPEND_API_VERSION:=True}
- provider_id: openai
provider_type: remote::openai
config:
api_key: ${env.OPENAI_API_KEY:=}
base_url: ${env.OPENAI_BASE_URL:=https://api.openai.com/v1}
- provider_id: anthropic
provider_type: remote::anthropic
config:
api_key: ${env.ANTHROPIC_API_KEY:=}
- provider_id: gemini
provider_type: remote::gemini
config:
api_key: ${env.GEMINI_API_KEY:=}
- provider_id: ${env.VERTEX_AI_PROJECT:+vertexai}
provider_type: remote::vertexai
config:
project: ${env.VERTEX_AI_PROJECT:=}
location: ${env.VERTEX_AI_LOCATION:=us-central1}
- provider_id: groq
provider_type: remote::groq
config:
url: https://api.groq.com
api_key: ${env.GROQ_API_KEY:=}
- provider_id: sambanova
provider_type: remote::sambanova
config:
url: https://api.sambanova.ai/v1
api_key: ${env.SAMBANOVA_API_KEY:=}
- provider_id: ${env.AZURE_API_KEY:+azure}
provider_type: remote::azure
config:
api_key: ${env.AZURE_API_KEY:=}
api_base: ${env.AZURE_API_BASE:=}
api_version: ${env.AZURE_API_VERSION:=}
api_type: ${env.AZURE_API_TYPE:=}
- provider_id: sentence-transformers
provider_type: inline::sentence-transformers
vector_io:
- provider_id: faiss
provider_type: inline::faiss
config:
persistence:
namespace: vector_io::faiss
backend: kv_default
- provider_id: sqlite-vec
provider_type: inline::sqlite-vec
config:
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/sqlite_vec.db
persistence:
namespace: vector_io::sqlite_vec
backend: kv_default
- provider_id: ${env.MILVUS_URL:+milvus}
provider_type: inline::milvus
config:
db_path: ${env.MILVUS_DB_PATH:=~/.llama/distributions/ci-tests}/milvus.db
persistence:
namespace: vector_io::milvus
backend: kv_default
- provider_id: ${env.CHROMADB_URL:+chromadb}
provider_type: remote::chromadb
config:
url: ${env.CHROMADB_URL:=}
persistence:
namespace: vector_io::chroma_remote
backend: kv_default
- provider_id: ${env.PGVECTOR_DB:+pgvector}
provider_type: remote::pgvector
config:
host: ${env.PGVECTOR_HOST:=localhost}
port: ${env.PGVECTOR_PORT:=5432}
db: ${env.PGVECTOR_DB:=}
user: ${env.PGVECTOR_USER:=}
password: ${env.PGVECTOR_PASSWORD:=}
persistence:
namespace: vector_io::pgvector
backend: kv_default
- provider_id: ${env.QDRANT_URL:+qdrant}
provider_type: remote::qdrant
config:
api_key: ${env.QDRANT_API_KEY:=}
persistence:
namespace: vector_io::qdrant_remote
backend: kv_default
- provider_id: ${env.WEAVIATE_CLUSTER_URL:+weaviate}
provider_type: remote::weaviate
config:
weaviate_api_key: null
weaviate_cluster_url: ${env.WEAVIATE_CLUSTER_URL:=localhost:8080}
persistence:
namespace: vector_io::weaviate
backend: kv_default
files:
- provider_id: meta-reference-files
provider_type: inline::localfs
config:
storage_dir: ${env.FILES_STORAGE_DIR:=~/.llama/distributions/ci-tests/files}
metadata_store:
table_name: files_metadata
backend: sql_default
safety:
- provider_id: llama-guard
provider_type: inline::llama-guard
config:
excluded_categories: []
- provider_id: code-scanner
provider_type: inline::code-scanner
agents:
- provider_id: meta-reference
provider_type: inline::meta-reference
config:
persistence:
agent_state:
namespace: agents
backend: kv_default
responses:
table_name: responses
backend: sql_default
max_write_queue_size: 10000
num_writers: 4
post_training:
- provider_id: torchtune-cpu
provider_type: inline::torchtune-cpu
config:
checkpoint_format: meta
eval:
- provider_id: meta-reference
provider_type: inline::meta-reference
config:
kvstore:
namespace: eval
backend: kv_default
datasetio:
- provider_id: huggingface
provider_type: remote::huggingface
config:
kvstore:
namespace: datasetio::huggingface
backend: kv_default
- provider_id: localfs
provider_type: inline::localfs
config:
kvstore:
namespace: datasetio::localfs
backend: kv_default
scoring:
- provider_id: basic
provider_type: inline::basic
- provider_id: llm-as-judge
provider_type: inline::llm-as-judge
- provider_id: braintrust
provider_type: inline::braintrust
config:
openai_api_key: ${env.OPENAI_API_KEY:=}
tool_runtime:
- provider_id: brave-search
provider_type: remote::brave-search
config:
api_key: ${env.BRAVE_SEARCH_API_KEY:=}
max_results: 3
- provider_id: tavily-search
provider_type: remote::tavily-search
config:
api_key: ${env.TAVILY_SEARCH_API_KEY:=}
max_results: 3
- provider_id: rag-runtime
provider_type: inline::rag-runtime
- provider_id: model-context-protocol
provider_type: remote::model-context-protocol
batches:
- provider_id: reference
provider_type: inline::reference
config:
kvstore:
namespace: batches
backend: kv_default
storage:
backends:
kv_default:
type: kv_postgres
host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432}
db: ${env.POSTGRES_DB:=llamastack}
user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack}
table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore}
sql_default:
type: sql_postgres
host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432}
db: ${env.POSTGRES_DB:=llamastack}
user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack}
stores:
metadata:
namespace: registry
backend: kv_default
inference:
table_name: inference_store
backend: sql_default
max_write_queue_size: 10000
num_writers: 4
conversations:
table_name: openai_conversations
backend: sql_default
prompts:
namespace: prompts
backend: kv_default
registered_resources:
models: []
shields:
- shield_id: llama-guard
provider_id: ${env.SAFETY_MODEL:+llama-guard}
provider_shield_id: ${env.SAFETY_MODEL:=}
- shield_id: code-scanner
provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner}
provider_shield_id: ${env.CODE_SCANNER_MODEL:=}
vector_dbs: []
datasets: []
scoring_fns: []
benchmarks: []
tool_groups:
- toolgroup_id: builtin::websearch
provider_id: tavily-search
- toolgroup_id: builtin::rag
provider_id: rag-runtime
server:
port: 8321
telemetry:
enabled: true
vector_stores:
default_provider_id: faiss
default_embedding_model:
provider_id: sentence-transformers
model_id: nomic-ai/nomic-embed-text-v1.5
safety:
default_shield_id: llama-guard

View file

@ -165,20 +165,15 @@ providers:
- provider_id: meta-reference - provider_id: meta-reference
provider_type: inline::meta-reference provider_type: inline::meta-reference
config: config:
persistence_store: persistence:
type: sql_postgres agent_state:
host: ${env.POSTGRES_HOST:=localhost} namespace: agents
port: ${env.POSTGRES_PORT:=5432} backend: kv_default
db: ${env.POSTGRES_DB:=llamastack} responses:
user: ${env.POSTGRES_USER:=llamastack} table_name: responses
password: ${env.POSTGRES_PASSWORD:=llamastack} backend: sql_default
responses_store: max_write_queue_size: 10000
type: sql_postgres num_writers: 4
host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432}
db: ${env.POSTGRES_DB:=llamastack}
user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack}
post_training: post_training:
- provider_id: huggingface-gpu - provider_id: huggingface-gpu
provider_type: inline::huggingface-gpu provider_type: inline::huggingface-gpu
@ -237,10 +232,10 @@ providers:
config: config:
kvstore: kvstore:
namespace: batches namespace: batches
backend: kv_postgres backend: kv_default
storage: storage:
backends: backends:
kv_postgres: kv_default:
type: kv_postgres type: kv_postgres
host: ${env.POSTGRES_HOST:=localhost} host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432} port: ${env.POSTGRES_PORT:=5432}
@ -248,7 +243,7 @@ storage:
user: ${env.POSTGRES_USER:=llamastack} user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack} password: ${env.POSTGRES_PASSWORD:=llamastack}
table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore} table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore}
sql_postgres: sql_default:
type: sql_postgres type: sql_postgres
host: ${env.POSTGRES_HOST:=localhost} host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432} port: ${env.POSTGRES_PORT:=5432}
@ -258,27 +253,44 @@ storage:
stores: stores:
metadata: metadata:
namespace: registry namespace: registry
backend: kv_postgres backend: kv_default
inference: inference:
table_name: inference_store table_name: inference_store
backend: sql_postgres backend: sql_default
max_write_queue_size: 10000 max_write_queue_size: 10000
num_writers: 4 num_writers: 4
conversations: conversations:
table_name: openai_conversations table_name: openai_conversations
backend: sql_postgres backend: sql_default
prompts: prompts:
namespace: prompts namespace: prompts
backend: kv_postgres backend: kv_default
registered_resources: registered_resources:
models: [] models: []
shields: [] shields:
- shield_id: llama-guard
provider_id: ${env.SAFETY_MODEL:+llama-guard}
provider_shield_id: ${env.SAFETY_MODEL:=}
- shield_id: code-scanner
provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner}
provider_shield_id: ${env.CODE_SCANNER_MODEL:=}
vector_dbs: [] vector_dbs: []
datasets: [] datasets: []
scoring_fns: [] scoring_fns: []
benchmarks: [] benchmarks: []
tool_groups: [] tool_groups:
- toolgroup_id: builtin::websearch
provider_id: tavily-search
- toolgroup_id: builtin::rag
provider_id: rag-runtime
server: server:
port: 8321 port: 8321
telemetry: telemetry:
enabled: true enabled: true
vector_stores:
default_provider_id: faiss
default_embedding_model:
provider_id: sentence-transformers
model_id: nomic-ai/nomic-embed-text-v1.5
safety:
default_shield_id: llama-guard

View file

@ -165,20 +165,15 @@ providers:
- provider_id: meta-reference - provider_id: meta-reference
provider_type: inline::meta-reference provider_type: inline::meta-reference
config: config:
persistence_store: persistence:
type: sql_postgres agent_state:
host: ${env.POSTGRES_HOST:=localhost} namespace: agents
port: ${env.POSTGRES_PORT:=5432} backend: kv_default
db: ${env.POSTGRES_DB:=llamastack} responses:
user: ${env.POSTGRES_USER:=llamastack} table_name: responses
password: ${env.POSTGRES_PASSWORD:=llamastack} backend: sql_default
responses_store: max_write_queue_size: 10000
type: sql_postgres num_writers: 4
host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432}
db: ${env.POSTGRES_DB:=llamastack}
user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack}
post_training: post_training:
- provider_id: torchtune-cpu - provider_id: torchtune-cpu
provider_type: inline::torchtune-cpu provider_type: inline::torchtune-cpu
@ -234,10 +229,10 @@ providers:
config: config:
kvstore: kvstore:
namespace: batches namespace: batches
backend: kv_postgres backend: kv_default
storage: storage:
backends: backends:
kv_postgres: kv_default:
type: kv_postgres type: kv_postgres
host: ${env.POSTGRES_HOST:=localhost} host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432} port: ${env.POSTGRES_PORT:=5432}
@ -245,7 +240,7 @@ storage:
user: ${env.POSTGRES_USER:=llamastack} user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack} password: ${env.POSTGRES_PASSWORD:=llamastack}
table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore} table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore}
sql_postgres: sql_default:
type: sql_postgres type: sql_postgres
host: ${env.POSTGRES_HOST:=localhost} host: ${env.POSTGRES_HOST:=localhost}
port: ${env.POSTGRES_PORT:=5432} port: ${env.POSTGRES_PORT:=5432}
@ -255,27 +250,44 @@ storage:
stores: stores:
metadata: metadata:
namespace: registry namespace: registry
backend: kv_postgres backend: kv_default
inference: inference:
table_name: inference_store table_name: inference_store
backend: sql_postgres backend: sql_default
max_write_queue_size: 10000 max_write_queue_size: 10000
num_writers: 4 num_writers: 4
conversations: conversations:
table_name: openai_conversations table_name: openai_conversations
backend: sql_postgres backend: sql_default
prompts: prompts:
namespace: prompts namespace: prompts
backend: kv_postgres backend: kv_default
registered_resources: registered_resources:
models: [] models: []
shields: [] shields:
- shield_id: llama-guard
provider_id: ${env.SAFETY_MODEL:+llama-guard}
provider_shield_id: ${env.SAFETY_MODEL:=}
- shield_id: code-scanner
provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner}
provider_shield_id: ${env.CODE_SCANNER_MODEL:=}
vector_dbs: [] vector_dbs: []
datasets: [] datasets: []
scoring_fns: [] scoring_fns: []
benchmarks: [] benchmarks: []
tool_groups: [] tool_groups:
- toolgroup_id: builtin::websearch
provider_id: tavily-search
- toolgroup_id: builtin::rag
provider_id: rag-runtime
server: server:
port: 8321 port: 8321
telemetry: telemetry:
enabled: true enabled: true
vector_stores:
default_provider_id: faiss
default_embedding_model:
provider_id: sentence-transformers
model_id: nomic-ai/nomic-embed-text-v1.5
safety:
default_shield_id: llama-guard

View file

@ -17,11 +17,6 @@ from llama_stack.core.datatypes import (
ToolGroupInput, ToolGroupInput,
VectorStoresConfig, VectorStoresConfig,
) )
from llama_stack.core.storage.datatypes import (
InferenceStoreReference,
KVStoreReference,
SqlStoreReference,
)
from llama_stack.core.utils.dynamic import instantiate_class_type from llama_stack.core.utils.dynamic import instantiate_class_type
from llama_stack.distributions.template import DistributionTemplate, RunConfigSettings from llama_stack.distributions.template import DistributionTemplate, RunConfigSettings
from llama_stack.providers.datatypes import RemoteProviderSpec from llama_stack.providers.datatypes import RemoteProviderSpec
@ -154,10 +149,11 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
BuildProvider(provider_type="inline::reference"), BuildProvider(provider_type="inline::reference"),
], ],
} }
files_config = LocalfsFilesImplConfig.sample_run_config(f"~/.llama/distributions/{name}")
files_provider = Provider( files_provider = Provider(
provider_id="meta-reference-files", provider_id="meta-reference-files",
provider_type="inline::localfs", provider_type="inline::localfs",
config=LocalfsFilesImplConfig.sample_run_config(f"~/.llama/distributions/{name}"), config=files_config,
) )
embedding_provider = Provider( embedding_provider = Provider(
provider_id="sentence-transformers", provider_id="sentence-transformers",
@ -187,7 +183,8 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
provider_shield_id="${env.CODE_SCANNER_MODEL:=}", provider_shield_id="${env.CODE_SCANNER_MODEL:=}",
), ),
] ]
postgres_config = PostgresSqlStoreConfig.sample_run_config() postgres_sql_config = PostgresSqlStoreConfig.sample_run_config()
postgres_kv_config = PostgresKVStoreConfig.sample_run_config()
default_overrides = { default_overrides = {
"inference": remote_inference_providers + [embedding_provider], "inference": remote_inference_providers + [embedding_provider],
"vector_io": [ "vector_io": [
@ -244,6 +241,33 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
"files": [files_provider], "files": [files_provider],
} }
base_run_settings = RunConfigSettings(
provider_overrides=default_overrides,
default_models=[],
default_tool_groups=default_tool_groups,
default_shields=default_shields,
vector_stores_config=VectorStoresConfig(
default_provider_id="faiss",
default_embedding_model=QualifiedModel(
provider_id="sentence-transformers",
model_id="nomic-ai/nomic-embed-text-v1.5",
),
),
safety_config=SafetyConfig(
default_shield_id="llama-guard",
),
)
postgres_run_settings = base_run_settings.model_copy(
update={
"storage_backends": {
"kv_default": postgres_kv_config,
"sql_default": postgres_sql_config,
}
},
deep=True,
)
return DistributionTemplate( return DistributionTemplate(
name=name, name=name,
distro_type="self_hosted", distro_type="self_hosted",
@ -253,71 +277,8 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate:
providers=providers, providers=providers,
additional_pip_packages=list(set(PostgresSqlStoreConfig.pip_packages() + PostgresKVStoreConfig.pip_packages())), additional_pip_packages=list(set(PostgresSqlStoreConfig.pip_packages() + PostgresKVStoreConfig.pip_packages())),
run_configs={ run_configs={
"run.yaml": RunConfigSettings( "run.yaml": base_run_settings,
provider_overrides=default_overrides, "run-with-postgres-store.yaml": postgres_run_settings,
default_models=[],
default_tool_groups=default_tool_groups,
default_shields=default_shields,
vector_stores_config=VectorStoresConfig(
default_provider_id="faiss",
default_embedding_model=QualifiedModel(
provider_id="sentence-transformers",
model_id="nomic-ai/nomic-embed-text-v1.5",
),
),
safety_config=SafetyConfig(
default_shield_id="llama-guard",
),
),
"run-with-postgres-store.yaml": RunConfigSettings(
provider_overrides={
**default_overrides,
"agents": [
Provider(
provider_id="meta-reference",
provider_type="inline::meta-reference",
config=dict(
persistence_store=postgres_config,
responses_store=postgres_config,
),
)
],
"batches": [
Provider(
provider_id="reference",
provider_type="inline::reference",
config=dict(
kvstore=KVStoreReference(
backend="kv_postgres",
namespace="batches",
).model_dump(exclude_none=True),
),
)
],
},
storage_backends={
"kv_postgres": PostgresKVStoreConfig.sample_run_config(),
"sql_postgres": postgres_config,
},
storage_stores={
"metadata": KVStoreReference(
backend="kv_postgres",
namespace="registry",
).model_dump(exclude_none=True),
"inference": InferenceStoreReference(
backend="sql_postgres",
table_name="inference_store",
).model_dump(exclude_none=True),
"conversations": SqlStoreReference(
backend="sql_postgres",
table_name="openai_conversations",
).model_dump(exclude_none=True),
"prompts": KVStoreReference(
backend="kv_postgres",
namespace="prompts",
).model_dump(exclude_none=True),
},
),
}, },
run_config_env_vars={ run_config_env_vars={
"LLAMA_STACK_PORT": ( "LLAMA_STACK_PORT": (

View file

@ -66,14 +66,6 @@ class InferenceStore:
}, },
) )
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
logger.debug(
f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
if not self._worker_tasks: if not self._worker_tasks:
return return
@ -94,10 +86,29 @@ class InferenceStore:
if self.enable_write_queue and self._queue is not None: if self.enable_write_queue and self._queue is not None:
await self._queue.join() await self._queue.join()
async def _ensure_workers_started(self) -> None:
"""Ensure the async write queue workers run on the current loop."""
if not self.enable_write_queue:
return
if self._queue is None:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
logger.debug(
f"Inference store write queue created with max size {self._max_write_queue_size} "
f"and {self._num_writers} writers"
)
if not self._worker_tasks:
loop = asyncio.get_running_loop()
for _ in range(self._num_writers):
task = loop.create_task(self._worker_loop())
self._worker_tasks.append(task)
async def store_chat_completion( async def store_chat_completion(
self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam] self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam]
) -> None: ) -> None:
if self.enable_write_queue: if self.enable_write_queue:
await self._ensure_workers_started()
if self._queue is None: if self._queue is None:
raise ValueError("Inference store is not initialized") raise ValueError("Inference store is not initialized")
try: try:

View file

@ -3,8 +3,6 @@
# #
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import asyncio
from typing import Any
from llama_stack.apis.agents import ( from llama_stack.apis.agents import (
Order, Order,
@ -19,12 +17,12 @@ from llama_stack.apis.agents.openai_responses import (
) )
from llama_stack.apis.inference import OpenAIMessageParam from llama_stack.apis.inference import OpenAIMessageParam
from llama_stack.core.datatypes import AccessRule from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference
from llama_stack.log import get_logger from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl from ..sqlstore.sqlstore import sqlstore_impl
logger = get_logger(name=__name__, category="openai_responses") logger = get_logger(name=__name__, category="openai_responses")
@ -55,28 +53,12 @@ class ResponsesStore:
self.policy = policy self.policy = policy
self.sql_store = None self.sql_store = None
self.enable_write_queue = True
# Async write queue and worker control
self._queue: (
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
) = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = self.reference.max_write_queue_size
self._num_writers: int = max(1, self.reference.num_writers)
async def initialize(self): async def initialize(self):
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
base_store = sqlstore_impl(self.reference) base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy) self.sql_store = AuthorizedSqlStore(base_store, self.policy)
# Disable write queue for SQLite since WAL mode handles concurrency
# Keep it enabled for other backends (like Postgres) for performance
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)")
await self.sql_store.create_table( await self.sql_store.create_table(
"openai_responses", "openai_responses",
{ {
@ -95,33 +77,12 @@ class ResponsesStore:
}, },
) )
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
logger.debug(
f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
if not self._worker_tasks: return
return
if self._queue is not None:
await self._queue.join()
for t in self._worker_tasks:
if not t.done():
t.cancel()
for t in self._worker_tasks:
try:
await t
except asyncio.CancelledError:
pass
self._worker_tasks.clear()
async def flush(self) -> None: async def flush(self) -> None:
"""Wait for all queued writes to complete. Useful for testing.""" """Maintained for compatibility; no-op now that writes are synchronous."""
if self.enable_write_queue and self._queue is not None: return
await self._queue.join()
async def store_response_object( async def store_response_object(
self, self,
@ -129,31 +90,7 @@ class ResponsesStore:
input: list[OpenAIResponseInput], input: list[OpenAIResponseInput],
messages: list[OpenAIMessageParam], messages: list[OpenAIMessageParam],
) -> None: ) -> None:
if self.enable_write_queue: await self._write_response_object(response_object, input, messages)
if self._queue is None:
raise ValueError("Responses store is not initialized")
try:
self._queue.put_nowait((response_object, input, messages))
except asyncio.QueueFull:
logger.warning(f"Write queue full; adding response id={getattr(response_object, 'id', '<unknown>')}")
await self._queue.put((response_object, input, messages))
else:
await self._write_response_object(response_object, input, messages)
async def _worker_loop(self) -> None:
assert self._queue is not None
while True:
try:
item = await self._queue.get()
except asyncio.CancelledError:
break
response_object, input, messages = item
try:
await self._write_response_object(response_object, input, messages)
except Exception as e: # noqa: BLE001
logger.error(f"Error writing response object: {e}")
finally:
self._queue.task_done()
async def _write_response_object( async def _write_response_object(
self, self,

View file

@ -45,8 +45,13 @@ def _enhance_item_with_access_control(item: Mapping[str, Any], current_user: Use
enhanced["owner_principal"] = current_user.principal enhanced["owner_principal"] = current_user.principal
enhanced["access_attributes"] = current_user.attributes enhanced["access_attributes"] = current_user.attributes
else: else:
enhanced["owner_principal"] = None # IMPORTANT: Use empty string and null value (not None) to match public access filter
enhanced["access_attributes"] = None # The public access filter in _get_public_access_conditions() expects:
# - owner_principal = '' (empty string)
# - access_attributes = null (JSON null, which serializes to the string 'null')
# Setting them to None (SQL NULL) will cause rows to be filtered out on read.
enhanced["owner_principal"] = ""
enhanced["access_attributes"] = None # Pydantic/JSON will serialize this as JSON null
return enhanced return enhanced
@ -188,8 +193,9 @@ class AuthorizedSqlStore:
enhanced_data["owner_principal"] = current_user.principal enhanced_data["owner_principal"] = current_user.principal
enhanced_data["access_attributes"] = current_user.attributes enhanced_data["access_attributes"] = current_user.attributes
else: else:
enhanced_data["owner_principal"] = None # IMPORTANT: Use empty string for owner_principal to match public access filter
enhanced_data["access_attributes"] = None enhanced_data["owner_principal"] = ""
enhanced_data["access_attributes"] = None # Will serialize as JSON null
await self.sql_store.update(table, enhanced_data, where) await self.sql_store.update(table, enhanced_data, where)
@ -245,14 +251,24 @@ class AuthorizedSqlStore:
raise ValueError(f"Unsupported database type: {self.database_type}") raise ValueError(f"Unsupported database type: {self.database_type}")
def _get_public_access_conditions(self) -> list[str]: def _get_public_access_conditions(self) -> list[str]:
"""Get the SQL conditions for public access.""" """Get the SQL conditions for public access.
# Public records are records that have no owner_principal or access_attributes
Public records are those with:
- owner_principal = '' (empty string)
- access_attributes is either SQL NULL or JSON null
Note: Different databases serialize None differently:
- SQLite: None JSON null (text = 'null')
- Postgres: None SQL NULL (IS NULL)
"""
conditions = ["owner_principal = ''"] conditions = ["owner_principal = ''"]
if self.database_type == StorageBackendType.SQL_POSTGRES.value: if self.database_type == StorageBackendType.SQL_POSTGRES.value:
# Postgres stores JSON null as 'null' # Accept both SQL NULL and JSON null for Postgres compatibility
conditions.append("access_attributes::text = 'null'") # This handles both old rows (SQL NULL) and new rows (JSON null)
conditions.append("(access_attributes IS NULL OR access_attributes::text = 'null')")
elif self.database_type == StorageBackendType.SQL_SQLITE.value: elif self.database_type == StorageBackendType.SQL_SQLITE.value:
conditions.append("access_attributes = 'null'") # SQLite serializes None as JSON null
conditions.append("(access_attributes IS NULL OR access_attributes = 'null')")
else: else:
raise ValueError(f"Unsupported database type: {self.database_type}") raise ValueError(f"Unsupported database type: {self.database_type}")
return conditions return conditions

View file

@ -1,6 +1,7 @@
{ {
"default": [ "default": [
{"suite": "base", "setup": "ollama"}, {"suite": "base", "setup": "ollama"},
{"suite": "base", "setup": "ollama-postgres", "allowed_clients": ["server"], "stack_config": "server:ci-tests::run-with-postgres-store.yaml"},
{"suite": "vision", "setup": "ollama-vision"}, {"suite": "vision", "setup": "ollama-vision"},
{"suite": "responses", "setup": "gpt"}, {"suite": "responses", "setup": "gpt"},
{"suite": "base-vllm-subset", "setup": "vllm"} {"suite": "base-vllm-subset", "setup": "vllm"}

View file

@ -233,10 +233,21 @@ def instantiate_llama_stack_client(session):
raise ValueError("You must specify either --stack-config or LLAMA_STACK_CONFIG") raise ValueError("You must specify either --stack-config or LLAMA_STACK_CONFIG")
# Handle server:<config_name> format or server:<config_name>:<port> # Handle server:<config_name> format or server:<config_name>:<port>
# Also handles server:<distro>::<run_file.yaml> format
if config.startswith("server:"): if config.startswith("server:"):
parts = config.split(":") # Strip the "server:" prefix first
config_name = parts[1] config_part = config[7:] # len("server:") == 7
port = int(parts[2]) if len(parts) > 2 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT))
# Check for :: (distro::runfile format)
if "::" in config_part:
config_name = config_part
port = int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT))
else:
# Single colon format: either <name> or <name>:<port>
parts = config_part.split(":")
config_name = parts[0]
port = int(parts[1]) if len(parts) > 1 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT))
base_url = f"http://localhost:{port}" base_url = f"http://localhost:{port}"
force_restart = os.environ.get("LLAMA_STACK_TEST_FORCE_SERVER_RESTART") == "1" force_restart = os.environ.get("LLAMA_STACK_TEST_FORCE_SERVER_RESTART") == "1"

View file

@ -71,6 +71,26 @@ SETUP_DEFINITIONS: dict[str, Setup] = {
"embedding_model": "ollama/nomic-embed-text:v1.5", "embedding_model": "ollama/nomic-embed-text:v1.5",
}, },
), ),
"ollama-postgres": Setup(
name="ollama-postgres",
description="Server-mode tests with Postgres-backed persistence",
env={
"OLLAMA_URL": "http://0.0.0.0:11434",
"SAFETY_MODEL": "ollama/llama-guard3:1b",
"POSTGRES_HOST": "127.0.0.1",
"POSTGRES_PORT": "5432",
"POSTGRES_DB": "llamastack",
"POSTGRES_USER": "llamastack",
"POSTGRES_PASSWORD": "llamastack",
"LLAMA_STACK_LOGGING": "openai_responses=info",
},
defaults={
"text_model": "ollama/llama3.2:3b-instruct-fp16",
"embedding_model": "sentence-transformers/nomic-embed-text-v1.5",
"safety_model": "ollama/llama-guard3:1b",
"safety_shield": "llama-guard",
},
),
"vllm": Setup( "vllm": Setup(
name="vllm", name="vllm",
description="vLLM provider with a text model", description="vLLM provider with a text model",