From 492f79ca9b2c4ac5c77346fc91ec8a9811dec342 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Wed, 12 Nov 2025 10:35:39 -0800 Subject: [PATCH] fix: harden storage semantics (#4118) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes issues in the storage system by guaranteeing immediate durability for responses and ensuring background writers stay alive. Three related fixes: * Responses to the OpenAI-compatible API now write directly to Postgres/SQLite inside the request instead of detouring through an async queue that might never drain; this restores the expected read-after-write behavior and removes the "response not found" races reported by users. * The access-control shim was stamping owner_principal/access_attributes as SQL NULL, which Postgres interprets as non-public rows; fixing it to use the empty-string/JSON-null pattern means conversations and responses stored without an authenticated user stay queryable (matching SQLite). * The inference-store queue remains for batching, but its worker tasks now start lazily on the live event loop so server startup doesn't cancel them—writes keep flowing even when the stack is launched via llama stack run. Closes #4115 ### Test Plan Added a matrix entry to test our "base" suite against Postgres as the store. --- .../actions/setup-test-environment/action.yml | 26 ++ .github/workflows/integration-tests.yml | 12 +- .../distributions/ci-tests/ci_tests.py | 1 - .../ci-tests/run-with-postgres-store.yaml | 293 ++++++++++++++++++ .../starter-gpu/run-with-postgres-store.yaml | 58 ++-- .../starter/run-with-postgres-store.yaml | 58 ++-- .../distributions/starter/starter.py | 105 ++----- .../utils/inference/inference_store.py | 27 +- .../utils/responses/responses_store.py | 75 +---- .../utils/sqlstore/authorized_sqlstore.py | 34 +- tests/integration/ci_matrix.json | 1 + tests/integration/fixtures/common.py | 17 +- tests/integration/suites.py | 20 ++ 13 files changed, 516 insertions(+), 211 deletions(-) create mode 100644 src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml diff --git a/.github/actions/setup-test-environment/action.yml b/.github/actions/setup-test-environment/action.yml index 7b306fef5..1c9d019cc 100644 --- a/.github/actions/setup-test-environment/action.yml +++ b/.github/actions/setup-test-environment/action.yml @@ -39,6 +39,32 @@ runs: if: ${{ inputs.setup == 'vllm' && inputs.inference-mode == 'record' }} uses: ./.github/actions/setup-vllm + - name: Start Postgres service + if: ${{ contains(inputs.setup, 'postgres') }} + shell: bash + run: | + sudo docker rm -f postgres-ci || true + sudo docker run -d --name postgres-ci \ + -e POSTGRES_USER=llamastack \ + -e POSTGRES_PASSWORD=llamastack \ + -e POSTGRES_DB=llamastack \ + -p 5432:5432 \ + postgres:16 + + echo "Waiting for Postgres to become ready..." + for i in {1..30}; do + if sudo docker exec postgres-ci pg_isready -U llamastack -d llamastack >/dev/null 2>&1; then + echo "Postgres is ready" + break + fi + if [ "$i" -eq 30 ]; then + echo "Postgres failed to start in time" + sudo docker logs postgres-ci || true + exit 1 + fi + sleep 2 + done + - name: Build Llama Stack shell: bash run: | diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 2c797e906..71c7933b4 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -66,12 +66,12 @@ jobs: run-replay-mode-tests: needs: generate-matrix runs-on: ubuntu-latest - name: ${{ format('Integration Tests ({0}, {1}, {2}, client={3}, {4})', matrix.client-type, matrix.config.setup, matrix.python-version, matrix.client-version, matrix.config.suite) }} + name: ${{ format('Integration Tests ({0}, {1}, {2}, client={3}, {4})', matrix.client, matrix.config.setup, matrix.python-version, matrix.client-version, matrix.config.suite) }} strategy: fail-fast: false matrix: - client-type: [library, docker, server] + client: [library, docker, server] # Use Python 3.13 only on nightly schedule (daily latest client test), otherwise use 3.12 python-version: ${{ github.event.schedule == '0 0 * * *' && fromJSON('["3.12", "3.13"]') || fromJSON('["3.12"]') }} client-version: ${{ (github.event.schedule == '0 0 * * *' || github.event.inputs.test-all-client-versions == 'true') && fromJSON('["published", "latest"]') || fromJSON('["latest"]') }} @@ -84,6 +84,7 @@ jobs: uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - name: Setup test environment + if: ${{ matrix.config.allowed_clients == null || contains(matrix.config.allowed_clients, matrix.client) }} uses: ./.github/actions/setup-test-environment with: python-version: ${{ matrix.python-version }} @@ -93,11 +94,16 @@ jobs: inference-mode: 'replay' - name: Run tests + if: ${{ matrix.config.allowed_clients == null || contains(matrix.config.allowed_clients, matrix.client) }} uses: ./.github/actions/run-and-record-tests env: OPENAI_API_KEY: dummy with: - stack-config: ${{ matrix.client-type == 'library' && 'ci-tests' || matrix.client-type == 'server' && 'server:ci-tests' || 'docker:ci-tests' }} + stack-config: >- + ${{ matrix.config.stack_config + || (matrix.client == 'library' && 'ci-tests') + || (matrix.client == 'server' && 'server:ci-tests') + || 'docker:ci-tests' }} setup: ${{ matrix.config.setup }} inference-mode: 'replay' suite: ${{ matrix.config.suite }} diff --git a/src/llama_stack/distributions/ci-tests/ci_tests.py b/src/llama_stack/distributions/ci-tests/ci_tests.py index c06b1b98d..ab102f5f3 100644 --- a/src/llama_stack/distributions/ci-tests/ci_tests.py +++ b/src/llama_stack/distributions/ci-tests/ci_tests.py @@ -13,6 +13,5 @@ from ..starter.starter import get_distribution_template as get_starter_distribut def get_distribution_template() -> DistributionTemplate: template = get_starter_distribution_template(name="ci-tests") template.description = "CI tests for Llama Stack" - template.run_configs.pop("run-with-postgres-store.yaml", None) return template diff --git a/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml new file mode 100644 index 000000000..5384b58fe --- /dev/null +++ b/src/llama_stack/distributions/ci-tests/run-with-postgres-store.yaml @@ -0,0 +1,293 @@ +version: 2 +image_name: ci-tests +apis: +- agents +- batches +- datasetio +- eval +- files +- inference +- post_training +- safety +- scoring +- tool_runtime +- vector_io +providers: + inference: + - provider_id: ${env.CEREBRAS_API_KEY:+cerebras} + provider_type: remote::cerebras + config: + base_url: https://api.cerebras.ai + api_key: ${env.CEREBRAS_API_KEY:=} + - provider_id: ${env.OLLAMA_URL:+ollama} + provider_type: remote::ollama + config: + url: ${env.OLLAMA_URL:=http://localhost:11434} + - provider_id: ${env.VLLM_URL:+vllm} + provider_type: remote::vllm + config: + url: ${env.VLLM_URL:=} + max_tokens: ${env.VLLM_MAX_TOKENS:=4096} + api_token: ${env.VLLM_API_TOKEN:=fake} + tls_verify: ${env.VLLM_TLS_VERIFY:=true} + - provider_id: ${env.TGI_URL:+tgi} + provider_type: remote::tgi + config: + url: ${env.TGI_URL:=} + - provider_id: fireworks + provider_type: remote::fireworks + config: + url: https://api.fireworks.ai/inference/v1 + api_key: ${env.FIREWORKS_API_KEY:=} + - provider_id: together + provider_type: remote::together + config: + url: https://api.together.xyz/v1 + api_key: ${env.TOGETHER_API_KEY:=} + - provider_id: bedrock + provider_type: remote::bedrock + config: + api_key: ${env.AWS_BEDROCK_API_KEY:=} + region_name: ${env.AWS_DEFAULT_REGION:=us-east-2} + - provider_id: ${env.NVIDIA_API_KEY:+nvidia} + provider_type: remote::nvidia + config: + url: ${env.NVIDIA_BASE_URL:=https://integrate.api.nvidia.com} + api_key: ${env.NVIDIA_API_KEY:=} + append_api_version: ${env.NVIDIA_APPEND_API_VERSION:=True} + - provider_id: openai + provider_type: remote::openai + config: + api_key: ${env.OPENAI_API_KEY:=} + base_url: ${env.OPENAI_BASE_URL:=https://api.openai.com/v1} + - provider_id: anthropic + provider_type: remote::anthropic + config: + api_key: ${env.ANTHROPIC_API_KEY:=} + - provider_id: gemini + provider_type: remote::gemini + config: + api_key: ${env.GEMINI_API_KEY:=} + - provider_id: ${env.VERTEX_AI_PROJECT:+vertexai} + provider_type: remote::vertexai + config: + project: ${env.VERTEX_AI_PROJECT:=} + location: ${env.VERTEX_AI_LOCATION:=us-central1} + - provider_id: groq + provider_type: remote::groq + config: + url: https://api.groq.com + api_key: ${env.GROQ_API_KEY:=} + - provider_id: sambanova + provider_type: remote::sambanova + config: + url: https://api.sambanova.ai/v1 + api_key: ${env.SAMBANOVA_API_KEY:=} + - provider_id: ${env.AZURE_API_KEY:+azure} + provider_type: remote::azure + config: + api_key: ${env.AZURE_API_KEY:=} + api_base: ${env.AZURE_API_BASE:=} + api_version: ${env.AZURE_API_VERSION:=} + api_type: ${env.AZURE_API_TYPE:=} + - provider_id: sentence-transformers + provider_type: inline::sentence-transformers + vector_io: + - provider_id: faiss + provider_type: inline::faiss + config: + persistence: + namespace: vector_io::faiss + backend: kv_default + - provider_id: sqlite-vec + provider_type: inline::sqlite-vec + config: + db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/sqlite_vec.db + persistence: + namespace: vector_io::sqlite_vec + backend: kv_default + - provider_id: ${env.MILVUS_URL:+milvus} + provider_type: inline::milvus + config: + db_path: ${env.MILVUS_DB_PATH:=~/.llama/distributions/ci-tests}/milvus.db + persistence: + namespace: vector_io::milvus + backend: kv_default + - provider_id: ${env.CHROMADB_URL:+chromadb} + provider_type: remote::chromadb + config: + url: ${env.CHROMADB_URL:=} + persistence: + namespace: vector_io::chroma_remote + backend: kv_default + - provider_id: ${env.PGVECTOR_DB:+pgvector} + provider_type: remote::pgvector + config: + host: ${env.PGVECTOR_HOST:=localhost} + port: ${env.PGVECTOR_PORT:=5432} + db: ${env.PGVECTOR_DB:=} + user: ${env.PGVECTOR_USER:=} + password: ${env.PGVECTOR_PASSWORD:=} + persistence: + namespace: vector_io::pgvector + backend: kv_default + - provider_id: ${env.QDRANT_URL:+qdrant} + provider_type: remote::qdrant + config: + api_key: ${env.QDRANT_API_KEY:=} + persistence: + namespace: vector_io::qdrant_remote + backend: kv_default + - provider_id: ${env.WEAVIATE_CLUSTER_URL:+weaviate} + provider_type: remote::weaviate + config: + weaviate_api_key: null + weaviate_cluster_url: ${env.WEAVIATE_CLUSTER_URL:=localhost:8080} + persistence: + namespace: vector_io::weaviate + backend: kv_default + files: + - provider_id: meta-reference-files + provider_type: inline::localfs + config: + storage_dir: ${env.FILES_STORAGE_DIR:=~/.llama/distributions/ci-tests/files} + metadata_store: + table_name: files_metadata + backend: sql_default + safety: + - provider_id: llama-guard + provider_type: inline::llama-guard + config: + excluded_categories: [] + - provider_id: code-scanner + provider_type: inline::code-scanner + agents: + - provider_id: meta-reference + provider_type: inline::meta-reference + config: + persistence: + agent_state: + namespace: agents + backend: kv_default + responses: + table_name: responses + backend: sql_default + max_write_queue_size: 10000 + num_writers: 4 + post_training: + - provider_id: torchtune-cpu + provider_type: inline::torchtune-cpu + config: + checkpoint_format: meta + eval: + - provider_id: meta-reference + provider_type: inline::meta-reference + config: + kvstore: + namespace: eval + backend: kv_default + datasetio: + - provider_id: huggingface + provider_type: remote::huggingface + config: + kvstore: + namespace: datasetio::huggingface + backend: kv_default + - provider_id: localfs + provider_type: inline::localfs + config: + kvstore: + namespace: datasetio::localfs + backend: kv_default + scoring: + - provider_id: basic + provider_type: inline::basic + - provider_id: llm-as-judge + provider_type: inline::llm-as-judge + - provider_id: braintrust + provider_type: inline::braintrust + config: + openai_api_key: ${env.OPENAI_API_KEY:=} + tool_runtime: + - provider_id: brave-search + provider_type: remote::brave-search + config: + api_key: ${env.BRAVE_SEARCH_API_KEY:=} + max_results: 3 + - provider_id: tavily-search + provider_type: remote::tavily-search + config: + api_key: ${env.TAVILY_SEARCH_API_KEY:=} + max_results: 3 + - provider_id: rag-runtime + provider_type: inline::rag-runtime + - provider_id: model-context-protocol + provider_type: remote::model-context-protocol + batches: + - provider_id: reference + provider_type: inline::reference + config: + kvstore: + namespace: batches + backend: kv_default +storage: + backends: + kv_default: + type: kv_postgres + host: ${env.POSTGRES_HOST:=localhost} + port: ${env.POSTGRES_PORT:=5432} + db: ${env.POSTGRES_DB:=llamastack} + user: ${env.POSTGRES_USER:=llamastack} + password: ${env.POSTGRES_PASSWORD:=llamastack} + table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore} + sql_default: + type: sql_postgres + host: ${env.POSTGRES_HOST:=localhost} + port: ${env.POSTGRES_PORT:=5432} + db: ${env.POSTGRES_DB:=llamastack} + user: ${env.POSTGRES_USER:=llamastack} + password: ${env.POSTGRES_PASSWORD:=llamastack} + stores: + metadata: + namespace: registry + backend: kv_default + inference: + table_name: inference_store + backend: sql_default + max_write_queue_size: 10000 + num_writers: 4 + conversations: + table_name: openai_conversations + backend: sql_default + prompts: + namespace: prompts + backend: kv_default +registered_resources: + models: [] + shields: + - shield_id: llama-guard + provider_id: ${env.SAFETY_MODEL:+llama-guard} + provider_shield_id: ${env.SAFETY_MODEL:=} + - shield_id: code-scanner + provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner} + provider_shield_id: ${env.CODE_SCANNER_MODEL:=} + vector_dbs: [] + datasets: [] + scoring_fns: [] + benchmarks: [] + tool_groups: + - toolgroup_id: builtin::websearch + provider_id: tavily-search + - toolgroup_id: builtin::rag + provider_id: rag-runtime +server: + port: 8321 +telemetry: + enabled: true +vector_stores: + default_provider_id: faiss + default_embedding_model: + provider_id: sentence-transformers + model_id: nomic-ai/nomic-embed-text-v1.5 +safety: + default_shield_id: llama-guard diff --git a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml index 1920ebd9d..e29ada6f4 100644 --- a/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter-gpu/run-with-postgres-store.yaml @@ -165,20 +165,15 @@ providers: - provider_id: meta-reference provider_type: inline::meta-reference config: - persistence_store: - type: sql_postgres - host: ${env.POSTGRES_HOST:=localhost} - port: ${env.POSTGRES_PORT:=5432} - db: ${env.POSTGRES_DB:=llamastack} - user: ${env.POSTGRES_USER:=llamastack} - password: ${env.POSTGRES_PASSWORD:=llamastack} - responses_store: - type: sql_postgres - host: ${env.POSTGRES_HOST:=localhost} - port: ${env.POSTGRES_PORT:=5432} - db: ${env.POSTGRES_DB:=llamastack} - user: ${env.POSTGRES_USER:=llamastack} - password: ${env.POSTGRES_PASSWORD:=llamastack} + persistence: + agent_state: + namespace: agents + backend: kv_default + responses: + table_name: responses + backend: sql_default + max_write_queue_size: 10000 + num_writers: 4 post_training: - provider_id: huggingface-gpu provider_type: inline::huggingface-gpu @@ -237,10 +232,10 @@ providers: config: kvstore: namespace: batches - backend: kv_postgres + backend: kv_default storage: backends: - kv_postgres: + kv_default: type: kv_postgres host: ${env.POSTGRES_HOST:=localhost} port: ${env.POSTGRES_PORT:=5432} @@ -248,7 +243,7 @@ storage: user: ${env.POSTGRES_USER:=llamastack} password: ${env.POSTGRES_PASSWORD:=llamastack} table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore} - sql_postgres: + sql_default: type: sql_postgres host: ${env.POSTGRES_HOST:=localhost} port: ${env.POSTGRES_PORT:=5432} @@ -258,27 +253,44 @@ storage: stores: metadata: namespace: registry - backend: kv_postgres + backend: kv_default inference: table_name: inference_store - backend: sql_postgres + backend: sql_default max_write_queue_size: 10000 num_writers: 4 conversations: table_name: openai_conversations - backend: sql_postgres + backend: sql_default prompts: namespace: prompts - backend: kv_postgres + backend: kv_default registered_resources: models: [] - shields: [] + shields: + - shield_id: llama-guard + provider_id: ${env.SAFETY_MODEL:+llama-guard} + provider_shield_id: ${env.SAFETY_MODEL:=} + - shield_id: code-scanner + provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner} + provider_shield_id: ${env.CODE_SCANNER_MODEL:=} vector_dbs: [] datasets: [] scoring_fns: [] benchmarks: [] - tool_groups: [] + tool_groups: + - toolgroup_id: builtin::websearch + provider_id: tavily-search + - toolgroup_id: builtin::rag + provider_id: rag-runtime server: port: 8321 telemetry: enabled: true +vector_stores: + default_provider_id: faiss + default_embedding_model: + provider_id: sentence-transformers + model_id: nomic-ai/nomic-embed-text-v1.5 +safety: + default_shield_id: llama-guard diff --git a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml index 702f95381..437674bf9 100644 --- a/src/llama_stack/distributions/starter/run-with-postgres-store.yaml +++ b/src/llama_stack/distributions/starter/run-with-postgres-store.yaml @@ -165,20 +165,15 @@ providers: - provider_id: meta-reference provider_type: inline::meta-reference config: - persistence_store: - type: sql_postgres - host: ${env.POSTGRES_HOST:=localhost} - port: ${env.POSTGRES_PORT:=5432} - db: ${env.POSTGRES_DB:=llamastack} - user: ${env.POSTGRES_USER:=llamastack} - password: ${env.POSTGRES_PASSWORD:=llamastack} - responses_store: - type: sql_postgres - host: ${env.POSTGRES_HOST:=localhost} - port: ${env.POSTGRES_PORT:=5432} - db: ${env.POSTGRES_DB:=llamastack} - user: ${env.POSTGRES_USER:=llamastack} - password: ${env.POSTGRES_PASSWORD:=llamastack} + persistence: + agent_state: + namespace: agents + backend: kv_default + responses: + table_name: responses + backend: sql_default + max_write_queue_size: 10000 + num_writers: 4 post_training: - provider_id: torchtune-cpu provider_type: inline::torchtune-cpu @@ -234,10 +229,10 @@ providers: config: kvstore: namespace: batches - backend: kv_postgres + backend: kv_default storage: backends: - kv_postgres: + kv_default: type: kv_postgres host: ${env.POSTGRES_HOST:=localhost} port: ${env.POSTGRES_PORT:=5432} @@ -245,7 +240,7 @@ storage: user: ${env.POSTGRES_USER:=llamastack} password: ${env.POSTGRES_PASSWORD:=llamastack} table_name: ${env.POSTGRES_TABLE_NAME:=llamastack_kvstore} - sql_postgres: + sql_default: type: sql_postgres host: ${env.POSTGRES_HOST:=localhost} port: ${env.POSTGRES_PORT:=5432} @@ -255,27 +250,44 @@ storage: stores: metadata: namespace: registry - backend: kv_postgres + backend: kv_default inference: table_name: inference_store - backend: sql_postgres + backend: sql_default max_write_queue_size: 10000 num_writers: 4 conversations: table_name: openai_conversations - backend: sql_postgres + backend: sql_default prompts: namespace: prompts - backend: kv_postgres + backend: kv_default registered_resources: models: [] - shields: [] + shields: + - shield_id: llama-guard + provider_id: ${env.SAFETY_MODEL:+llama-guard} + provider_shield_id: ${env.SAFETY_MODEL:=} + - shield_id: code-scanner + provider_id: ${env.CODE_SCANNER_MODEL:+code-scanner} + provider_shield_id: ${env.CODE_SCANNER_MODEL:=} vector_dbs: [] datasets: [] scoring_fns: [] benchmarks: [] - tool_groups: [] + tool_groups: + - toolgroup_id: builtin::websearch + provider_id: tavily-search + - toolgroup_id: builtin::rag + provider_id: rag-runtime server: port: 8321 telemetry: enabled: true +vector_stores: + default_provider_id: faiss + default_embedding_model: + provider_id: sentence-transformers + model_id: nomic-ai/nomic-embed-text-v1.5 +safety: + default_shield_id: llama-guard diff --git a/src/llama_stack/distributions/starter/starter.py b/src/llama_stack/distributions/starter/starter.py index 88cd3a4fe..7b7773289 100644 --- a/src/llama_stack/distributions/starter/starter.py +++ b/src/llama_stack/distributions/starter/starter.py @@ -17,11 +17,6 @@ from llama_stack.core.datatypes import ( ToolGroupInput, VectorStoresConfig, ) -from llama_stack.core.storage.datatypes import ( - InferenceStoreReference, - KVStoreReference, - SqlStoreReference, -) from llama_stack.core.utils.dynamic import instantiate_class_type from llama_stack.distributions.template import DistributionTemplate, RunConfigSettings from llama_stack.providers.datatypes import RemoteProviderSpec @@ -154,10 +149,11 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: BuildProvider(provider_type="inline::reference"), ], } + files_config = LocalfsFilesImplConfig.sample_run_config(f"~/.llama/distributions/{name}") files_provider = Provider( provider_id="meta-reference-files", provider_type="inline::localfs", - config=LocalfsFilesImplConfig.sample_run_config(f"~/.llama/distributions/{name}"), + config=files_config, ) embedding_provider = Provider( provider_id="sentence-transformers", @@ -187,7 +183,8 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: provider_shield_id="${env.CODE_SCANNER_MODEL:=}", ), ] - postgres_config = PostgresSqlStoreConfig.sample_run_config() + postgres_sql_config = PostgresSqlStoreConfig.sample_run_config() + postgres_kv_config = PostgresKVStoreConfig.sample_run_config() default_overrides = { "inference": remote_inference_providers + [embedding_provider], "vector_io": [ @@ -244,6 +241,33 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: "files": [files_provider], } + base_run_settings = RunConfigSettings( + provider_overrides=default_overrides, + default_models=[], + default_tool_groups=default_tool_groups, + default_shields=default_shields, + vector_stores_config=VectorStoresConfig( + default_provider_id="faiss", + default_embedding_model=QualifiedModel( + provider_id="sentence-transformers", + model_id="nomic-ai/nomic-embed-text-v1.5", + ), + ), + safety_config=SafetyConfig( + default_shield_id="llama-guard", + ), + ) + + postgres_run_settings = base_run_settings.model_copy( + update={ + "storage_backends": { + "kv_default": postgres_kv_config, + "sql_default": postgres_sql_config, + } + }, + deep=True, + ) + return DistributionTemplate( name=name, distro_type="self_hosted", @@ -253,71 +277,8 @@ def get_distribution_template(name: str = "starter") -> DistributionTemplate: providers=providers, additional_pip_packages=list(set(PostgresSqlStoreConfig.pip_packages() + PostgresKVStoreConfig.pip_packages())), run_configs={ - "run.yaml": RunConfigSettings( - provider_overrides=default_overrides, - default_models=[], - default_tool_groups=default_tool_groups, - default_shields=default_shields, - vector_stores_config=VectorStoresConfig( - default_provider_id="faiss", - default_embedding_model=QualifiedModel( - provider_id="sentence-transformers", - model_id="nomic-ai/nomic-embed-text-v1.5", - ), - ), - safety_config=SafetyConfig( - default_shield_id="llama-guard", - ), - ), - "run-with-postgres-store.yaml": RunConfigSettings( - provider_overrides={ - **default_overrides, - "agents": [ - Provider( - provider_id="meta-reference", - provider_type="inline::meta-reference", - config=dict( - persistence_store=postgres_config, - responses_store=postgres_config, - ), - ) - ], - "batches": [ - Provider( - provider_id="reference", - provider_type="inline::reference", - config=dict( - kvstore=KVStoreReference( - backend="kv_postgres", - namespace="batches", - ).model_dump(exclude_none=True), - ), - ) - ], - }, - storage_backends={ - "kv_postgres": PostgresKVStoreConfig.sample_run_config(), - "sql_postgres": postgres_config, - }, - storage_stores={ - "metadata": KVStoreReference( - backend="kv_postgres", - namespace="registry", - ).model_dump(exclude_none=True), - "inference": InferenceStoreReference( - backend="sql_postgres", - table_name="inference_store", - ).model_dump(exclude_none=True), - "conversations": SqlStoreReference( - backend="sql_postgres", - table_name="openai_conversations", - ).model_dump(exclude_none=True), - "prompts": KVStoreReference( - backend="kv_postgres", - namespace="prompts", - ).model_dump(exclude_none=True), - }, - ), + "run.yaml": base_run_settings, + "run-with-postgres-store.yaml": postgres_run_settings, }, run_config_env_vars={ "LLAMA_STACK_PORT": ( diff --git a/src/llama_stack/providers/utils/inference/inference_store.py b/src/llama_stack/providers/utils/inference/inference_store.py index 2bf947a8d..a3a28aec0 100644 --- a/src/llama_stack/providers/utils/inference/inference_store.py +++ b/src/llama_stack/providers/utils/inference/inference_store.py @@ -66,14 +66,6 @@ class InferenceStore: }, ) - if self.enable_write_queue: - self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) - for _ in range(self._num_writers): - self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - logger.debug( - f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" - ) - async def shutdown(self) -> None: if not self._worker_tasks: return @@ -94,10 +86,29 @@ class InferenceStore: if self.enable_write_queue and self._queue is not None: await self._queue.join() + async def _ensure_workers_started(self) -> None: + """Ensure the async write queue workers run on the current loop.""" + if not self.enable_write_queue: + return + + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) + logger.debug( + f"Inference store write queue created with max size {self._max_write_queue_size} " + f"and {self._num_writers} writers" + ) + + if not self._worker_tasks: + loop = asyncio.get_running_loop() + for _ in range(self._num_writers): + task = loop.create_task(self._worker_loop()) + self._worker_tasks.append(task) + async def store_chat_completion( self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam] ) -> None: if self.enable_write_queue: + await self._ensure_workers_started() if self._queue is None: raise ValueError("Inference store is not initialized") try: diff --git a/src/llama_stack/providers/utils/responses/responses_store.py b/src/llama_stack/providers/utils/responses/responses_store.py index 40466d00c..f5024a9ed 100644 --- a/src/llama_stack/providers/utils/responses/responses_store.py +++ b/src/llama_stack/providers/utils/responses/responses_store.py @@ -3,8 +3,6 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio -from typing import Any from llama_stack.apis.agents import ( Order, @@ -19,12 +17,12 @@ from llama_stack.apis.agents.openai_responses import ( ) from llama_stack.apis.inference import OpenAIMessageParam from llama_stack.core.datatypes import AccessRule -from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType +from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference from llama_stack.log import get_logger from ..sqlstore.api import ColumnDefinition, ColumnType from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore -from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl +from ..sqlstore.sqlstore import sqlstore_impl logger = get_logger(name=__name__, category="openai_responses") @@ -55,28 +53,12 @@ class ResponsesStore: self.policy = policy self.sql_store = None - self.enable_write_queue = True - - # Async write queue and worker control - self._queue: ( - asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None - ) = None - self._worker_tasks: list[asyncio.Task[Any]] = [] - self._max_write_queue_size: int = self.reference.max_write_queue_size - self._num_writers: int = max(1, self.reference.num_writers) async def initialize(self): """Create the necessary tables if they don't exist.""" base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - # Disable write queue for SQLite since WAL mode handles concurrency - # Keep it enabled for other backends (like Postgres) for performance - backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) - if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE: - self.enable_write_queue = False - logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)") - await self.sql_store.create_table( "openai_responses", { @@ -95,33 +77,12 @@ class ResponsesStore: }, ) - if self.enable_write_queue: - self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) - for _ in range(self._num_writers): - self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - logger.debug( - f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" - ) - async def shutdown(self) -> None: - if not self._worker_tasks: - return - if self._queue is not None: - await self._queue.join() - for t in self._worker_tasks: - if not t.done(): - t.cancel() - for t in self._worker_tasks: - try: - await t - except asyncio.CancelledError: - pass - self._worker_tasks.clear() + return async def flush(self) -> None: - """Wait for all queued writes to complete. Useful for testing.""" - if self.enable_write_queue and self._queue is not None: - await self._queue.join() + """Maintained for compatibility; no-op now that writes are synchronous.""" + return async def store_response_object( self, @@ -129,31 +90,7 @@ class ResponsesStore: input: list[OpenAIResponseInput], messages: list[OpenAIMessageParam], ) -> None: - if self.enable_write_queue: - if self._queue is None: - raise ValueError("Responses store is not initialized") - try: - self._queue.put_nowait((response_object, input, messages)) - except asyncio.QueueFull: - logger.warning(f"Write queue full; adding response id={getattr(response_object, 'id', '')}") - await self._queue.put((response_object, input, messages)) - else: - await self._write_response_object(response_object, input, messages) - - async def _worker_loop(self) -> None: - assert self._queue is not None - while True: - try: - item = await self._queue.get() - except asyncio.CancelledError: - break - response_object, input, messages = item - try: - await self._write_response_object(response_object, input, messages) - except Exception as e: # noqa: BLE001 - logger.error(f"Error writing response object: {e}") - finally: - self._queue.task_done() + await self._write_response_object(response_object, input, messages) async def _write_response_object( self, diff --git a/src/llama_stack/providers/utils/sqlstore/authorized_sqlstore.py b/src/llama_stack/providers/utils/sqlstore/authorized_sqlstore.py index 3dfc82677..eb2d9a491 100644 --- a/src/llama_stack/providers/utils/sqlstore/authorized_sqlstore.py +++ b/src/llama_stack/providers/utils/sqlstore/authorized_sqlstore.py @@ -45,8 +45,13 @@ def _enhance_item_with_access_control(item: Mapping[str, Any], current_user: Use enhanced["owner_principal"] = current_user.principal enhanced["access_attributes"] = current_user.attributes else: - enhanced["owner_principal"] = None - enhanced["access_attributes"] = None + # IMPORTANT: Use empty string and null value (not None) to match public access filter + # The public access filter in _get_public_access_conditions() expects: + # - owner_principal = '' (empty string) + # - access_attributes = null (JSON null, which serializes to the string 'null') + # Setting them to None (SQL NULL) will cause rows to be filtered out on read. + enhanced["owner_principal"] = "" + enhanced["access_attributes"] = None # Pydantic/JSON will serialize this as JSON null return enhanced @@ -188,8 +193,9 @@ class AuthorizedSqlStore: enhanced_data["owner_principal"] = current_user.principal enhanced_data["access_attributes"] = current_user.attributes else: - enhanced_data["owner_principal"] = None - enhanced_data["access_attributes"] = None + # IMPORTANT: Use empty string for owner_principal to match public access filter + enhanced_data["owner_principal"] = "" + enhanced_data["access_attributes"] = None # Will serialize as JSON null await self.sql_store.update(table, enhanced_data, where) @@ -245,14 +251,24 @@ class AuthorizedSqlStore: raise ValueError(f"Unsupported database type: {self.database_type}") def _get_public_access_conditions(self) -> list[str]: - """Get the SQL conditions for public access.""" - # Public records are records that have no owner_principal or access_attributes + """Get the SQL conditions for public access. + + Public records are those with: + - owner_principal = '' (empty string) + - access_attributes is either SQL NULL or JSON null + + Note: Different databases serialize None differently: + - SQLite: None → JSON null (text = 'null') + - Postgres: None → SQL NULL (IS NULL) + """ conditions = ["owner_principal = ''"] if self.database_type == StorageBackendType.SQL_POSTGRES.value: - # Postgres stores JSON null as 'null' - conditions.append("access_attributes::text = 'null'") + # Accept both SQL NULL and JSON null for Postgres compatibility + # This handles both old rows (SQL NULL) and new rows (JSON null) + conditions.append("(access_attributes IS NULL OR access_attributes::text = 'null')") elif self.database_type == StorageBackendType.SQL_SQLITE.value: - conditions.append("access_attributes = 'null'") + # SQLite serializes None as JSON null + conditions.append("(access_attributes IS NULL OR access_attributes = 'null')") else: raise ValueError(f"Unsupported database type: {self.database_type}") return conditions diff --git a/tests/integration/ci_matrix.json b/tests/integration/ci_matrix.json index 858176dff..43678e5c7 100644 --- a/tests/integration/ci_matrix.json +++ b/tests/integration/ci_matrix.json @@ -1,6 +1,7 @@ { "default": [ {"suite": "base", "setup": "ollama"}, + {"suite": "base", "setup": "ollama-postgres", "allowed_clients": ["server"], "stack_config": "server:ci-tests::run-with-postgres-store.yaml"}, {"suite": "vision", "setup": "ollama-vision"}, {"suite": "responses", "setup": "gpt"}, {"suite": "base-vllm-subset", "setup": "vllm"} diff --git a/tests/integration/fixtures/common.py b/tests/integration/fixtures/common.py index 407564c15..817180cfe 100644 --- a/tests/integration/fixtures/common.py +++ b/tests/integration/fixtures/common.py @@ -233,10 +233,21 @@ def instantiate_llama_stack_client(session): raise ValueError("You must specify either --stack-config or LLAMA_STACK_CONFIG") # Handle server: format or server:: + # Also handles server::: format if config.startswith("server:"): - parts = config.split(":") - config_name = parts[1] - port = int(parts[2]) if len(parts) > 2 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT)) + # Strip the "server:" prefix first + config_part = config[7:] # len("server:") == 7 + + # Check for :: (distro::runfile format) + if "::" in config_part: + config_name = config_part + port = int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT)) + else: + # Single colon format: either or : + parts = config_part.split(":") + config_name = parts[0] + port = int(parts[1]) if len(parts) > 1 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT)) + base_url = f"http://localhost:{port}" force_restart = os.environ.get("LLAMA_STACK_TEST_FORCE_SERVER_RESTART") == "1" diff --git a/tests/integration/suites.py b/tests/integration/suites.py index 0cec66afe..7689657b4 100644 --- a/tests/integration/suites.py +++ b/tests/integration/suites.py @@ -71,6 +71,26 @@ SETUP_DEFINITIONS: dict[str, Setup] = { "embedding_model": "ollama/nomic-embed-text:v1.5", }, ), + "ollama-postgres": Setup( + name="ollama-postgres", + description="Server-mode tests with Postgres-backed persistence", + env={ + "OLLAMA_URL": "http://0.0.0.0:11434", + "SAFETY_MODEL": "ollama/llama-guard3:1b", + "POSTGRES_HOST": "127.0.0.1", + "POSTGRES_PORT": "5432", + "POSTGRES_DB": "llamastack", + "POSTGRES_USER": "llamastack", + "POSTGRES_PASSWORD": "llamastack", + "LLAMA_STACK_LOGGING": "openai_responses=info", + }, + defaults={ + "text_model": "ollama/llama3.2:3b-instruct-fp16", + "embedding_model": "sentence-transformers/nomic-embed-text-v1.5", + "safety_model": "ollama/llama-guard3:1b", + "safety_shield": "llama-guard", + }, + ), "vllm": Setup( name="vllm", description="vLLM provider with a text model",