From 168c1209a010a6c29f3895095f98ff941c911215 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Mon, 3 Nov 2025 14:19:01 -0800 Subject: [PATCH] fix: enable SQLite WAL mode to prevent database locking errors Fixes race condition causing "database is locked" errors during concurrent writes to SQLite, particularly in streaming responses with guardrails where multiple inference calls write simultaneously. Enable Write-Ahead Logging (WAL) mode for SQLite which allows multiple concurrent readers and one writer without blocking. Set busy_timeout to 5s so SQLite retries instead of failing immediately. Remove the logic that disabled write queues for SQLite since WAL mode eliminates the locking issues that prompted disabling them. Fixes: test_output_safety_guardrails_safe_content[stream=True] flake --- .../utils/inference/inference_store.py | 13 ++------ .../utils/responses/responses_store.py | 12 ++----- .../utils/sqlstore/sqlalchemy_sqlstore.py | 32 ++++++++++++++++++- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/llama_stack/providers/utils/inference/inference_store.py b/src/llama_stack/providers/utils/inference/inference_store.py index 8e20bca6b..45d411f51 100644 --- a/src/llama_stack/providers/utils/inference/inference_store.py +++ b/src/llama_stack/providers/utils/inference/inference_store.py @@ -47,14 +47,6 @@ class InferenceStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - # Disable write queue for SQLite to avoid concurrency issues - backend_name = self.reference.backend - backend_config = _SQLSTORE_BACKENDS.get(backend_name) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE await self.sql_store.create_table( "chat_completions", { @@ -70,8 +62,9 @@ class InferenceStore: self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) for _ in range(self._num_writers): self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - else: - logger.info("Write queue disabled for SQLite to avoid concurrency issues") + logger.debug( + f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) async def shutdown(self) -> None: if not self._worker_tasks: diff --git a/src/llama_stack/providers/utils/responses/responses_store.py b/src/llama_stack/providers/utils/responses/responses_store.py index d5c243252..5cdca0488 100644 --- a/src/llama_stack/providers/utils/responses/responses_store.py +++ b/src/llama_stack/providers/utils/responses/responses_store.py @@ -70,13 +70,6 @@ class ResponsesStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - if backend_config.type == StorageBackendType.SQL_SQLITE: - self.enable_write_queue = False await self.sql_store.create_table( "openai_responses", { @@ -99,8 +92,9 @@ class ResponsesStore: self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) for _ in range(self._num_writers): self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - else: - logger.debug("Write queue disabled for SQLite to avoid concurrency issues") + logger.debug( + f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) async def shutdown(self) -> None: if not self._worker_tasks: diff --git a/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py b/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py index 1bd364d43..356f49ed1 100644 --- a/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py +++ b/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py @@ -17,6 +17,7 @@ from sqlalchemy import ( String, Table, Text, + event, inspect, select, text, @@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore): self.metadata = MetaData() def create_engine(self) -> AsyncEngine: - return create_async_engine(self.config.engine_str, pool_pre_ping=True) + # Configure connection args for better concurrency support + connect_args = {} + if "sqlite" in self.config.engine_str: + # SQLite-specific optimizations for concurrent access + # With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases + connect_args["timeout"] = 5.0 + connect_args["check_same_thread"] = False # Allow usage across asyncio tasks + + engine = create_async_engine( + self.config.engine_str, + pool_pre_ping=True, + connect_args=connect_args, + ) + + # Enable WAL mode for SQLite to support concurrent readers and writers + if "sqlite" in self.config.engine_str: + + @event.listens_for(engine.sync_engine, "connect") + def set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + # Enable Write-Ahead Logging for better concurrency + cursor.execute("PRAGMA journal_mode=WAL") + # Set busy timeout to 5 seconds (retry instead of immediate failure) + # With WAL mode, locks should be brief; if we hit 5s there's a bigger issue + cursor.execute("PRAGMA busy_timeout=5000") + # Use NORMAL synchronous mode for better performance (still safe with WAL) + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.close() + + return engine async def create_table( self,