fix: enable SQLite WAL mode to prevent database locking errors (backport #4048) (#4226)

Fixes race condition causing "database is locked" errors during
concurrent writes to SQLite, particularly in streaming responses with
guardrails where multiple inference calls write simultaneously.

Enable Write-Ahead Logging (WAL) mode for SQLite which allows multiple
concurrent readers and one writer without blocking. Set busy_timeout to
5s so SQLite retries instead of failing immediately. Remove the logic
that disabled write queues for SQLite since WAL mode eliminates the
locking issues that prompted disabling them.

Fixes: test_output_safety_guardrails_safe_content[stream=True]
flake<hr>This is an automatic backport of pull request #4048 done by
[Mergify](https://mergify.com).

Signed-off-by: Charlie Doern <cdoern@redhat.com>
Co-authored-by: Ashwin Bharambe <ashwin.bharambe@gmail.com>
This commit is contained in:
mergify[bot] 2025-11-24 11:30:57 -08:00 committed by GitHub
parent 46bd95e453
commit b9299a20ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 65 additions and 11 deletions

View file

@ -35,6 +35,7 @@ class InferenceStore:
self.reference = reference self.reference = reference
self.sql_store = None self.sql_store = None
self.policy = policy self.policy = policy
self.enable_write_queue = True
# Async write queue and worker control # Async write queue and worker control
self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None
@ -47,14 +48,13 @@ class InferenceStore:
base_store = sqlstore_impl(self.reference) base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy) self.sql_store = AuthorizedSqlStore(base_store, self.policy)
# Disable write queue for SQLite to avoid concurrency issues # Disable write queue for SQLite since WAL mode handles concurrency
backend_name = self.reference.backend # Keep it enabled for other backends (like Postgres) for performance
backend_config = _SQLSTORE_BACKENDS.get(backend_name) backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config is None: if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE:
raise ValueError( self.enable_write_queue = False
f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)")
)
self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE
await self.sql_store.create_table( await self.sql_store.create_table(
"chat_completions", "chat_completions",
{ {
@ -66,6 +66,14 @@ class InferenceStore:
}, },
) )
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
logger.debug(
f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
if not self._worker_tasks: if not self._worker_tasks:
return return

View file

@ -3,6 +3,7 @@
# #
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import asyncio
from llama_stack.apis.agents import ( from llama_stack.apis.agents import (
Order, Order,
@ -17,12 +18,12 @@ from llama_stack.apis.agents.openai_responses import (
) )
from llama_stack.apis.inference import OpenAIMessageParam from llama_stack.apis.inference import OpenAIMessageParam
from llama_stack.core.datatypes import AccessRule from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType
from llama_stack.log import get_logger from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import sqlstore_impl from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
logger = get_logger(name=__name__, category="openai_responses") logger = get_logger(name=__name__, category="openai_responses")
@ -59,6 +60,13 @@ class ResponsesStore:
base_store = sqlstore_impl(self.reference) base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy) self.sql_store = AuthorizedSqlStore(base_store, self.policy)
# Disable write queue for SQLite since WAL mode handles concurrency
# Keep it enabled for other backends (like Postgres) for performance
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)")
await self.sql_store.create_table( await self.sql_store.create_table(
"openai_responses", "openai_responses",
{ {
@ -77,6 +85,14 @@ class ResponsesStore:
}, },
) )
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
logger.debug(
f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
return return

View file

@ -17,6 +17,7 @@ from sqlalchemy import (
String, String,
Table, Table,
Text, Text,
event,
inspect, inspect,
select, select,
text, text,
@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore):
self.metadata = MetaData() self.metadata = MetaData()
def create_engine(self) -> AsyncEngine: def create_engine(self) -> AsyncEngine:
return create_async_engine(self.config.engine_str, pool_pre_ping=True) # Configure connection args for better concurrency support
connect_args = {}
if "sqlite" in self.config.engine_str:
# SQLite-specific optimizations for concurrent access
# With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases
connect_args["timeout"] = 5.0
connect_args["check_same_thread"] = False # Allow usage across asyncio tasks
engine = create_async_engine(
self.config.engine_str,
pool_pre_ping=True,
connect_args=connect_args,
)
# Enable WAL mode for SQLite to support concurrent readers and writers
if "sqlite" in self.config.engine_str:
@event.listens_for(engine.sync_engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
# Enable Write-Ahead Logging for better concurrency
cursor.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to 5 seconds (retry instead of immediate failure)
# With WAL mode, locks should be brief; if we hit 5s there's a bigger issue
cursor.execute("PRAGMA busy_timeout=5000")
# Use NORMAL synchronous mode for better performance (still safe with WAL)
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.close()
return engine
async def create_table( async def create_table(
self, self,