simplified some, walked back some decisions

This commit is contained in:
Ashwin Bharambe 2025-10-17 10:05:07 -07:00
parent af7472cdb0
commit 636764c2a1
90 changed files with 887 additions and 570 deletions

View file

@ -18,13 +18,13 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectWithInput,
)
from llama_stack.apis.inference import OpenAIMessageParam
from llama_stack.core.datatypes import AccessRule, ResponsesStoreConfig
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, SqlStoreType, sqlstore_impl
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
logger = get_logger(name=__name__, category="openai_responses")
@ -45,39 +45,38 @@ class _OpenAIResponseObjectWithInputAndMessages(OpenAIResponseObjectWithInput):
class ResponsesStore:
def __init__(
self,
config: ResponsesStoreConfig | SqlStoreConfig,
reference: ResponsesStoreReference | SqlStoreReference,
policy: list[AccessRule],
):
# Handle backward compatibility
if not isinstance(config, ResponsesStoreConfig):
# Legacy: SqlStoreConfig passed directly as config
config = ResponsesStoreConfig(
sql_store_config=config,
)
if isinstance(reference, ResponsesStoreReference):
self.reference = reference
else:
self.reference = ResponsesStoreReference(**reference.model_dump())
self.config = config
self.sql_store_config = config.sql_store_config
if not self.sql_store_config:
self.sql_store_config = SqliteSqlStoreConfig(
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
)
self.sql_store = None
self.policy = policy
# Disable write queue for SQLite to avoid concurrency issues
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
self.sql_store = None
self.enable_write_queue = True
# Async write queue and worker control
self._queue: (
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
) = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = config.max_write_queue_size
self._num_writers: int = max(1, config.num_writers)
self._max_write_queue_size: int = self.reference.max_write_queue_size
self._num_writers: int = max(1, self.reference.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
if backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
await self.sql_store.create_table(
"openai_responses",
{