From b9299a20edfea156ba3fb8e77ad667a77e1f889f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 24 Nov 2025 11:30:57 -0800 Subject: [PATCH] fix: enable SQLite WAL mode to prevent database locking errors (backport #4048) (#4226) Fixes race condition causing "database is locked" errors during concurrent writes to SQLite, particularly in streaming responses with guardrails where multiple inference calls write simultaneously. Enable Write-Ahead Logging (WAL) mode for SQLite which allows multiple concurrent readers and one writer without blocking. Set busy_timeout to 5s so SQLite retries instead of failing immediately. Remove the logic that disabled write queues for SQLite since WAL mode eliminates the locking issues that prompted disabling them. Fixes: test_output_safety_guardrails_safe_content[stream=True] flake
This is an automatic backport of pull request #4048 done by [Mergify](https://mergify.com). Signed-off-by: Charlie Doern Co-authored-by: Ashwin Bharambe --- .../utils/inference/inference_store.py | 24 +++++++++----- .../utils/responses/responses_store.py | 20 ++++++++++-- .../utils/sqlstore/sqlalchemy_sqlstore.py | 32 ++++++++++++++++++- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/llama_stack/providers/utils/inference/inference_store.py b/llama_stack/providers/utils/inference/inference_store.py index 8b7ed7123..50f5af117 100644 --- a/llama_stack/providers/utils/inference/inference_store.py +++ b/llama_stack/providers/utils/inference/inference_store.py @@ -35,6 +35,7 @@ class InferenceStore: self.reference = reference self.sql_store = None self.policy = policy + self.enable_write_queue = True # Async write queue and worker control self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None @@ -47,14 +48,13 @@ class InferenceStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - # Disable write queue for SQLite to avoid concurrency issues - backend_name = self.reference.backend - backend_config = _SQLSTORE_BACKENDS.get(backend_name) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE + # Disable write queue for SQLite since WAL mode handles concurrency + # Keep it enabled for other backends (like Postgres) for performance + backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) + if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE: + self.enable_write_queue = False + logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)") + await self.sql_store.create_table( "chat_completions", { @@ -66,6 +66,14 @@ class InferenceStore: }, ) + if self.enable_write_queue: + self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) + for _ in range(self._num_writers): + self._worker_tasks.append(asyncio.create_task(self._worker_loop())) + logger.debug( + f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) + async def shutdown(self) -> None: if not self._worker_tasks: return diff --git a/llama_stack/providers/utils/responses/responses_store.py b/llama_stack/providers/utils/responses/responses_store.py index f5024a9ed..3b63fbef6 100644 --- a/llama_stack/providers/utils/responses/responses_store.py +++ b/llama_stack/providers/utils/responses/responses_store.py @@ -3,6 +3,7 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import asyncio from llama_stack.apis.agents import ( Order, @@ -17,12 +18,12 @@ from llama_stack.apis.agents.openai_responses import ( ) from llama_stack.apis.inference import OpenAIMessageParam from llama_stack.core.datatypes import AccessRule -from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference +from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType from llama_stack.log import get_logger from ..sqlstore.api import ColumnDefinition, ColumnType from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore -from ..sqlstore.sqlstore import sqlstore_impl +from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl logger = get_logger(name=__name__, category="openai_responses") @@ -59,6 +60,13 @@ class ResponsesStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) + # Disable write queue for SQLite since WAL mode handles concurrency + # Keep it enabled for other backends (like Postgres) for performance + backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) + if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE: + self.enable_write_queue = False + logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)") + await self.sql_store.create_table( "openai_responses", { @@ -77,6 +85,14 @@ class ResponsesStore: }, ) + if self.enable_write_queue: + self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) + for _ in range(self._num_writers): + self._worker_tasks.append(asyncio.create_task(self._worker_loop())) + logger.debug( + f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) + async def shutdown(self) -> None: return diff --git a/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py b/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py index c1ccd73dd..0cd1401d4 100644 --- a/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py +++ b/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py @@ -17,6 +17,7 @@ from sqlalchemy import ( String, Table, Text, + event, inspect, select, text, @@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore): self.metadata = MetaData() def create_engine(self) -> AsyncEngine: - return create_async_engine(self.config.engine_str, pool_pre_ping=True) + # Configure connection args for better concurrency support + connect_args = {} + if "sqlite" in self.config.engine_str: + # SQLite-specific optimizations for concurrent access + # With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases + connect_args["timeout"] = 5.0 + connect_args["check_same_thread"] = False # Allow usage across asyncio tasks + + engine = create_async_engine( + self.config.engine_str, + pool_pre_ping=True, + connect_args=connect_args, + ) + + # Enable WAL mode for SQLite to support concurrent readers and writers + if "sqlite" in self.config.engine_str: + + @event.listens_for(engine.sync_engine, "connect") + def set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + # Enable Write-Ahead Logging for better concurrency + cursor.execute("PRAGMA journal_mode=WAL") + # Set busy timeout to 5 seconds (retry instead of immediate failure) + # With WAL mode, locks should be brief; if we hit 5s there's a bigger issue + cursor.execute("PRAGMA busy_timeout=5000") + # Use NORMAL synchronous mode for better performance (still safe with WAL) + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.close() + + return engine async def create_table( self,