chore: introduce write queue for response_store

# What does this PR do?


## Test Plan
This commit is contained in:
Eric Huang 2025-09-19 16:13:43 -07:00
parent d3600b92d1
commit 04fd837d2f
4 changed files with 129 additions and 7 deletions

View file

@ -3,6 +3,9 @@
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from typing import Any
from llama_stack.apis.agents import (
Order,
)
@ -14,25 +17,51 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObject,
OpenAIResponseObjectWithInput,
)
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.datatypes import AccessRule, ResponsesStoreConfig
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, sqlstore_impl
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, SqlStoreType, sqlstore_impl
logger = get_logger(name=__name__, category="responses_store")
class ResponsesStore:
def __init__(self, sql_store_config: SqlStoreConfig, policy: list[AccessRule]):
if not sql_store_config:
sql_store_config = SqliteSqlStoreConfig(
def __init__(
self,
config: ResponsesStoreConfig | SqlStoreConfig,
policy: list[AccessRule],
):
# Handle backward compatibility
if not isinstance(config, ResponsesStoreConfig):
# Legacy: SqlStoreConfig passed directly as config
config = ResponsesStoreConfig(
sql_store_config=config,
)
self.config = config
self.sql_store_config = config.sql_store_config
if not self.sql_store_config:
self.sql_store_config = SqliteSqlStoreConfig(
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
)
self.sql_store = AuthorizedSqlStore(sqlstore_impl(sql_store_config))
self.sql_store = None
self.policy = policy
# Disable write queue for SQLite to avoid concurrency issues
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
# Async write queue and worker control
self._queue: asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput]]] | None = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = config.max_write_queue_size
self._num_writers: int = max(1, config.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config))
await self.sql_store.create_table(
"openai_responses",
{
@ -43,9 +72,70 @@ class ResponsesStore:
},
)
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else:
logger.info("Write queue disabled for SQLite to avoid concurrency issues")
async def shutdown(self) -> None:
if not self._worker_tasks:
return
if self._queue is not None:
await self._queue.join()
for t in self._worker_tasks:
if not t.done():
t.cancel()
for t in self._worker_tasks:
try:
await t
except asyncio.CancelledError:
pass
self._worker_tasks.clear()
async def flush(self) -> None:
"""Wait for all queued writes to complete. Useful for testing."""
if self.enable_write_queue and self._queue is not None:
await self._queue.join()
async def store_response_object(
self, response_object: OpenAIResponseObject, input: list[OpenAIResponseInput]
) -> None:
if self.enable_write_queue:
if self._queue is None:
raise ValueError("Responses store is not initialized")
try:
self._queue.put_nowait((response_object, input))
except asyncio.QueueFull:
logger.warning(
f"Write queue full; adding response id={getattr(response_object, 'id', '<unknown>')}"
)
await self._queue.put((response_object, input))
else:
await self._write_response_object(response_object, input)
async def _worker_loop(self) -> None:
assert self._queue is not None
while True:
try:
item = await self._queue.get()
except asyncio.CancelledError:
break
response_object, input = item
try:
await self._write_response_object(response_object, input)
except Exception as e: # noqa: BLE001
logger.error(f"Error writing response object: {e}")
finally:
self._queue.task_done()
async def _write_response_object(
self, response_object: OpenAIResponseObject, input: list[OpenAIResponseInput]
) -> None:
if self.sql_store is None:
raise ValueError("Responses store is not initialized")
data = response_object.model_dump()
data["input"] = [input_item.model_dump() for input_item in input]