fix: enable SQLite WAL mode to prevent database locking errors

Fixes race condition causing "database is locked" errors during concurrent
writes to SQLite, particularly in streaming responses with guardrails where
multiple inference calls write simultaneously.

Enable Write-Ahead Logging (WAL) mode for SQLite which allows multiple
concurrent readers and one writer without blocking. Set busy_timeout to 5s
so SQLite retries instead of failing immediately. Remove the logic that
disabled write queues for SQLite since WAL mode eliminates the locking
issues that prompted disabling them.

Fixes: test_output_safety_guardrails_safe_content[stream=True] flake
This commit is contained in:
Ashwin Bharambe 2025-11-03 14:19:01 -08:00
parent 415fd9e36b
commit 168c1209a0
3 changed files with 37 additions and 20 deletions

View file

@ -47,14 +47,6 @@ class InferenceStore:
base_store = sqlstore_impl(self.reference) base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy) self.sql_store = AuthorizedSqlStore(base_store, self.policy)
# Disable write queue for SQLite to avoid concurrency issues
backend_name = self.reference.backend
backend_config = _SQLSTORE_BACKENDS.get(backend_name)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE
await self.sql_store.create_table( await self.sql_store.create_table(
"chat_completions", "chat_completions",
{ {
@ -70,8 +62,9 @@ class InferenceStore:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers): for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop())) self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else: logger.debug(
logger.info("Write queue disabled for SQLite to avoid concurrency issues") f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
if not self._worker_tasks: if not self._worker_tasks:

View file

@ -70,13 +70,6 @@ class ResponsesStore:
base_store = sqlstore_impl(self.reference) base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy) self.sql_store = AuthorizedSqlStore(base_store, self.policy)
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
if backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
await self.sql_store.create_table( await self.sql_store.create_table(
"openai_responses", "openai_responses",
{ {
@ -99,8 +92,9 @@ class ResponsesStore:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers): for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop())) self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else: logger.debug(
logger.debug("Write queue disabled for SQLite to avoid concurrency issues") f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}"
)
async def shutdown(self) -> None: async def shutdown(self) -> None:
if not self._worker_tasks: if not self._worker_tasks:

View file

@ -17,6 +17,7 @@ from sqlalchemy import (
String, String,
Table, Table,
Text, Text,
event,
inspect, inspect,
select, select,
text, text,
@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore):
self.metadata = MetaData() self.metadata = MetaData()
def create_engine(self) -> AsyncEngine: def create_engine(self) -> AsyncEngine:
return create_async_engine(self.config.engine_str, pool_pre_ping=True) # Configure connection args for better concurrency support
connect_args = {}
if "sqlite" in self.config.engine_str:
# SQLite-specific optimizations for concurrent access
# With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases
connect_args["timeout"] = 5.0
connect_args["check_same_thread"] = False # Allow usage across asyncio tasks
engine = create_async_engine(
self.config.engine_str,
pool_pre_ping=True,
connect_args=connect_args,
)
# Enable WAL mode for SQLite to support concurrent readers and writers
if "sqlite" in self.config.engine_str:
@event.listens_for(engine.sync_engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
# Enable Write-Ahead Logging for better concurrency
cursor.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to 5 seconds (retry instead of immediate failure)
# With WAL mode, locks should be brief; if we hit 5s there's a bigger issue
cursor.execute("PRAGMA busy_timeout=5000")
# Use NORMAL synchronous mode for better performance (still safe with WAL)
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.close()
return engine
async def create_table( async def create_table(
self, self,