fix: harden storage semantics (#4118)

Fixes issues in the storage system by guaranteeing immediate durability
for responses and ensuring background writers stay alive. Three related
fixes:

* Responses to the OpenAI-compatible API now write directly to
Postgres/SQLite inside the request instead of detouring through an async
queue that might never drain; this restores the expected
read-after-write behavior and removes the "response not found" races
reported by users.

* The access-control shim was stamping owner_principal/access_attributes
as SQL NULL, which Postgres interprets as non-public rows; fixing it to
use the empty-string/JSON-null pattern means conversations and responses
stored without an authenticated user stay queryable (matching SQLite).

* The inference-store queue remains for batching, but its worker tasks
now start lazily on the live event loop so server startup doesn't cancel
them—writes keep flowing even when the stack is launched via llama stack
run.

Closes #4115

Added a matrix entry to test our "base" suite against Postgres as the
store.
This commit is contained in:
Ashwin Bharambe 2025-11-12 10:35:39 -08:00
parent 56d87f5133
commit 81e44b06ff
27 changed files with 1195 additions and 160 deletions

View file

@ -3,8 +3,6 @@
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from typing import Any
from llama_stack.apis.agents import (
Order,
@ -19,12 +17,12 @@ from llama_stack.apis.agents.openai_responses import (
)
from llama_stack.apis.inference import OpenAIMessageParam
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
from ..sqlstore.sqlstore import sqlstore_impl
logger = get_logger(name=__name__, category="openai_responses")
@ -55,28 +53,12 @@ class ResponsesStore:
self.policy = policy
self.sql_store = None
self.enable_write_queue = True
# Async write queue and worker control
self._queue: (
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
) = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = self.reference.max_write_queue_size
self._num_writers: int = max(1, self.reference.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
if backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
await self.sql_store.create_table(
"openai_responses",
{
@ -95,32 +77,12 @@ class ResponsesStore:
},
)
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else:
logger.debug("Write queue disabled for SQLite to avoid concurrency issues")
async def shutdown(self) -> None:
if not self._worker_tasks:
return
if self._queue is not None:
await self._queue.join()
for t in self._worker_tasks:
if not t.done():
t.cancel()
for t in self._worker_tasks:
try:
await t
except asyncio.CancelledError:
pass
self._worker_tasks.clear()
return
async def flush(self) -> None:
"""Wait for all queued writes to complete. Useful for testing."""
if self.enable_write_queue and self._queue is not None:
await self._queue.join()
"""Maintained for compatibility; no-op now that writes are synchronous."""
return
async def store_response_object(
self,
@ -128,31 +90,7 @@ class ResponsesStore:
input: list[OpenAIResponseInput],
messages: list[OpenAIMessageParam],
) -> None:
if self.enable_write_queue:
if self._queue is None:
raise ValueError("Responses store is not initialized")
try:
self._queue.put_nowait((response_object, input, messages))
except asyncio.QueueFull:
logger.warning(f"Write queue full; adding response id={getattr(response_object, 'id', '<unknown>')}")
await self._queue.put((response_object, input, messages))
else:
await self._write_response_object(response_object, input, messages)
async def _worker_loop(self) -> None:
assert self._queue is not None
while True:
try:
item = await self._queue.get()
except asyncio.CancelledError:
break
response_object, input, messages = item
try:
await self._write_response_object(response_object, input, messages)
except Exception as e: # noqa: BLE001
logger.error(f"Error writing response object: {e}")
finally:
self._queue.task_done()
await self._write_response_object(response_object, input, messages)
async def _write_response_object(
self,