diff --git a/src/llama_stack/providers/utils/inference/inference_store.py b/src/llama_stack/providers/utils/inference/inference_store.py index 8e20bca6b..45d411f51 100644 --- a/src/llama_stack/providers/utils/inference/inference_store.py +++ b/src/llama_stack/providers/utils/inference/inference_store.py @@ -47,14 +47,6 @@ class InferenceStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - # Disable write queue for SQLite to avoid concurrency issues - backend_name = self.reference.backend - backend_config = _SQLSTORE_BACKENDS.get(backend_name) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE await self.sql_store.create_table( "chat_completions", { @@ -70,8 +62,9 @@ class InferenceStore: self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) for _ in range(self._num_writers): self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - else: - logger.info("Write queue disabled for SQLite to avoid concurrency issues") + logger.debug( + f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) async def shutdown(self) -> None: if not self._worker_tasks: diff --git a/src/llama_stack/providers/utils/responses/responses_store.py b/src/llama_stack/providers/utils/responses/responses_store.py index d5c243252..5cdca0488 100644 --- a/src/llama_stack/providers/utils/responses/responses_store.py +++ b/src/llama_stack/providers/utils/responses/responses_store.py @@ -70,13 +70,6 @@ class ResponsesStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - if backend_config.type == StorageBackendType.SQL_SQLITE: - self.enable_write_queue = False await self.sql_store.create_table( "openai_responses", { @@ -99,8 +92,9 @@ class ResponsesStore: self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) for _ in range(self._num_writers): self._worker_tasks.append(asyncio.create_task(self._worker_loop())) - else: - logger.debug("Write queue disabled for SQLite to avoid concurrency issues") + logger.debug( + f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) async def shutdown(self) -> None: if not self._worker_tasks: diff --git a/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py b/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py index 1bd364d43..356f49ed1 100644 --- a/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py +++ b/src/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py @@ -17,6 +17,7 @@ from sqlalchemy import ( String, Table, Text, + event, inspect, select, text, @@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore): self.metadata = MetaData() def create_engine(self) -> AsyncEngine: - return create_async_engine(self.config.engine_str, pool_pre_ping=True) + # Configure connection args for better concurrency support + connect_args = {} + if "sqlite" in self.config.engine_str: + # SQLite-specific optimizations for concurrent access + # With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases + connect_args["timeout"] = 5.0 + connect_args["check_same_thread"] = False # Allow usage across asyncio tasks + + engine = create_async_engine( + self.config.engine_str, + pool_pre_ping=True, + connect_args=connect_args, + ) + + # Enable WAL mode for SQLite to support concurrent readers and writers + if "sqlite" in self.config.engine_str: + + @event.listens_for(engine.sync_engine, "connect") + def set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + # Enable Write-Ahead Logging for better concurrency + cursor.execute("PRAGMA journal_mode=WAL") + # Set busy timeout to 5 seconds (retry instead of immediate failure) + # With WAL mode, locks should be brief; if we hit 5s there's a bigger issue + cursor.execute("PRAGMA busy_timeout=5000") + # Use NORMAL synchronous mode for better performance (still safe with WAL) + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.close() + + return engine async def create_table( self,