diff --git a/llama_stack/providers/utils/inference/inference_store.py b/llama_stack/providers/utils/inference/inference_store.py index 8b7ed7123..50f5af117 100644 --- a/llama_stack/providers/utils/inference/inference_store.py +++ b/llama_stack/providers/utils/inference/inference_store.py @@ -35,6 +35,7 @@ class InferenceStore: self.reference = reference self.sql_store = None self.policy = policy + self.enable_write_queue = True # Async write queue and worker control self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None @@ -47,14 +48,13 @@ class InferenceStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) - # Disable write queue for SQLite to avoid concurrency issues - backend_name = self.reference.backend - backend_config = _SQLSTORE_BACKENDS.get(backend_name) - if backend_config is None: - raise ValueError( - f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}" - ) - self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE + # Disable write queue for SQLite since WAL mode handles concurrency + # Keep it enabled for other backends (like Postgres) for performance + backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) + if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE: + self.enable_write_queue = False + logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)") + await self.sql_store.create_table( "chat_completions", { @@ -66,6 +66,14 @@ class InferenceStore: }, ) + if self.enable_write_queue: + self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) + for _ in range(self._num_writers): + self._worker_tasks.append(asyncio.create_task(self._worker_loop())) + logger.debug( + f"Inference store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) + async def shutdown(self) -> None: if not self._worker_tasks: return diff --git a/llama_stack/providers/utils/responses/responses_store.py b/llama_stack/providers/utils/responses/responses_store.py index f5024a9ed..3b63fbef6 100644 --- a/llama_stack/providers/utils/responses/responses_store.py +++ b/llama_stack/providers/utils/responses/responses_store.py @@ -3,6 +3,7 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import asyncio from llama_stack.apis.agents import ( Order, @@ -17,12 +18,12 @@ from llama_stack.apis.agents.openai_responses import ( ) from llama_stack.apis.inference import OpenAIMessageParam from llama_stack.core.datatypes import AccessRule -from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference +from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType from llama_stack.log import get_logger from ..sqlstore.api import ColumnDefinition, ColumnType from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore -from ..sqlstore.sqlstore import sqlstore_impl +from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl logger = get_logger(name=__name__, category="openai_responses") @@ -59,6 +60,13 @@ class ResponsesStore: base_store = sqlstore_impl(self.reference) self.sql_store = AuthorizedSqlStore(base_store, self.policy) + # Disable write queue for SQLite since WAL mode handles concurrency + # Keep it enabled for other backends (like Postgres) for performance + backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend) + if backend_config and backend_config.type == StorageBackendType.SQL_SQLITE: + self.enable_write_queue = False + logger.debug("Write queue disabled for SQLite (WAL mode handles concurrency)") + await self.sql_store.create_table( "openai_responses", { @@ -77,6 +85,14 @@ class ResponsesStore: }, ) + if self.enable_write_queue: + self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) + for _ in range(self._num_writers): + self._worker_tasks.append(asyncio.create_task(self._worker_loop())) + logger.debug( + f"Responses store write queue enabled with {self._num_writers} writers, max queue size {self._max_write_queue_size}" + ) + async def shutdown(self) -> None: return diff --git a/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py b/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py index c1ccd73dd..0cd1401d4 100644 --- a/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py +++ b/llama_stack/providers/utils/sqlstore/sqlalchemy_sqlstore.py @@ -17,6 +17,7 @@ from sqlalchemy import ( String, Table, Text, + event, inspect, select, text, @@ -75,7 +76,36 @@ class SqlAlchemySqlStoreImpl(SqlStore): self.metadata = MetaData() def create_engine(self) -> AsyncEngine: - return create_async_engine(self.config.engine_str, pool_pre_ping=True) + # Configure connection args for better concurrency support + connect_args = {} + if "sqlite" in self.config.engine_str: + # SQLite-specific optimizations for concurrent access + # With WAL mode, most locks resolve in milliseconds, but allow up to 5s for edge cases + connect_args["timeout"] = 5.0 + connect_args["check_same_thread"] = False # Allow usage across asyncio tasks + + engine = create_async_engine( + self.config.engine_str, + pool_pre_ping=True, + connect_args=connect_args, + ) + + # Enable WAL mode for SQLite to support concurrent readers and writers + if "sqlite" in self.config.engine_str: + + @event.listens_for(engine.sync_engine, "connect") + def set_sqlite_pragma(dbapi_conn, connection_record): + cursor = dbapi_conn.cursor() + # Enable Write-Ahead Logging for better concurrency + cursor.execute("PRAGMA journal_mode=WAL") + # Set busy timeout to 5 seconds (retry instead of immediate failure) + # With WAL mode, locks should be brief; if we hit 5s there's a bigger issue + cursor.execute("PRAGMA busy_timeout=5000") + # Use NORMAL synchronous mode for better performance (still safe with WAL) + cursor.execute("PRAGMA synchronous=NORMAL") + cursor.close() + + return engine async def create_table( self,