mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 20:14:13 +00:00
fix: responses <> chat completion input conversion (#3645)
Some checks failed
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 2s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 1s
Python Package Build Test / build (3.12) (push) Failing after 2s
Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 5s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
API Conformance Tests / check-schema-compatibility (push) Successful in 10s
Vector IO Integration Tests / test-matrix (push) Failing after 5s
Python Package Build Test / build (3.13) (push) Failing after 3s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 9s
Test External API and Providers / test-external (venv) (push) Failing after 6s
Unit Tests / unit-tests (3.12) (push) Failing after 5s
Unit Tests / unit-tests (3.13) (push) Failing after 6s
UI Tests / ui-tests (22) (push) Successful in 33s
Pre-commit / pre-commit (push) Successful in 1m27s
Some checks failed
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 2s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 1s
Python Package Build Test / build (3.12) (push) Failing after 2s
Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 5s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
API Conformance Tests / check-schema-compatibility (push) Successful in 10s
Vector IO Integration Tests / test-matrix (push) Failing after 5s
Python Package Build Test / build (3.13) (push) Failing after 3s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 9s
Test External API and Providers / test-external (venv) (push) Failing after 6s
Unit Tests / unit-tests (3.12) (push) Failing after 5s
Unit Tests / unit-tests (3.13) (push) Failing after 6s
UI Tests / ui-tests (22) (push) Successful in 33s
Pre-commit / pre-commit (push) Successful in 1m27s
# What does this PR do? closes #3268 closes #3498 When resuming from previous response ID, currently we attempt to convert from the stored responses input to chat completion messages, which is not always possible, e.g. for tool calls where some data is lost once converted from chat completion message to repsonses input format. This PR stores the chat completion messages that correspond to the _last_ call to chat completion, which is sufficient to be resumed from in the next responses API call, where we load these saved messages and skip conversion entirely. Separate issue to optimize storage: https://github.com/llamastack/llama-stack/issues/3646 ## Test Plan existing CI tests
This commit is contained in:
parent
ef0736527d
commit
14a94e9894
7 changed files with 202 additions and 58 deletions
|
@ -17,6 +17,7 @@ from llama_stack.apis.agents.openai_responses import (
|
|||
OpenAIResponseObject,
|
||||
OpenAIResponseObjectWithInput,
|
||||
)
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.core.datatypes import AccessRule, ResponsesStoreConfig
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
from llama_stack.log import get_logger
|
||||
|
@ -28,6 +29,19 @@ from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, SqlStoreTy
|
|||
logger = get_logger(name=__name__, category="openai_responses")
|
||||
|
||||
|
||||
class _OpenAIResponseObjectWithInputAndMessages(OpenAIResponseObjectWithInput):
|
||||
"""Internal class for storing responses with chat completion messages.
|
||||
|
||||
This extends the public OpenAIResponseObjectWithInput with messages field
|
||||
for internal storage. The messages field is not exposed in the public API.
|
||||
|
||||
The messages field is optional for backward compatibility with responses
|
||||
stored before this feature was added.
|
||||
"""
|
||||
|
||||
messages: list[OpenAIMessageParam] | None = None
|
||||
|
||||
|
||||
class ResponsesStore:
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -54,7 +68,9 @@ class ResponsesStore:
|
|||
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
|
||||
|
||||
# Async write queue and worker control
|
||||
self._queue: asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput]]] | None = None
|
||||
self._queue: (
|
||||
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
|
||||
) = None
|
||||
self._worker_tasks: list[asyncio.Task[Any]] = []
|
||||
self._max_write_queue_size: int = config.max_write_queue_size
|
||||
self._num_writers: int = max(1, config.num_writers)
|
||||
|
@ -100,18 +116,21 @@ class ResponsesStore:
|
|||
await self._queue.join()
|
||||
|
||||
async def store_response_object(
|
||||
self, response_object: OpenAIResponseObject, input: list[OpenAIResponseInput]
|
||||
self,
|
||||
response_object: OpenAIResponseObject,
|
||||
input: list[OpenAIResponseInput],
|
||||
messages: list[OpenAIMessageParam],
|
||||
) -> None:
|
||||
if self.enable_write_queue:
|
||||
if self._queue is None:
|
||||
raise ValueError("Responses store is not initialized")
|
||||
try:
|
||||
self._queue.put_nowait((response_object, input))
|
||||
self._queue.put_nowait((response_object, input, messages))
|
||||
except asyncio.QueueFull:
|
||||
logger.warning(f"Write queue full; adding response id={getattr(response_object, 'id', '<unknown>')}")
|
||||
await self._queue.put((response_object, input))
|
||||
await self._queue.put((response_object, input, messages))
|
||||
else:
|
||||
await self._write_response_object(response_object, input)
|
||||
await self._write_response_object(response_object, input, messages)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
assert self._queue is not None
|
||||
|
@ -120,22 +139,26 @@ class ResponsesStore:
|
|||
item = await self._queue.get()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
response_object, input = item
|
||||
response_object, input, messages = item
|
||||
try:
|
||||
await self._write_response_object(response_object, input)
|
||||
await self._write_response_object(response_object, input, messages)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.error(f"Error writing response object: {e}")
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
|
||||
async def _write_response_object(
|
||||
self, response_object: OpenAIResponseObject, input: list[OpenAIResponseInput]
|
||||
self,
|
||||
response_object: OpenAIResponseObject,
|
||||
input: list[OpenAIResponseInput],
|
||||
messages: list[OpenAIMessageParam],
|
||||
) -> None:
|
||||
if self.sql_store is None:
|
||||
raise ValueError("Responses store is not initialized")
|
||||
|
||||
data = response_object.model_dump()
|
||||
data["input"] = [input_item.model_dump() for input_item in input]
|
||||
data["messages"] = [msg.model_dump() for msg in messages]
|
||||
|
||||
await self.sql_store.insert(
|
||||
"openai_responses",
|
||||
|
@ -188,7 +211,7 @@ class ResponsesStore:
|
|||
last_id=data[-1].id if data else "",
|
||||
)
|
||||
|
||||
async def get_response_object(self, response_id: str) -> OpenAIResponseObjectWithInput:
|
||||
async def get_response_object(self, response_id: str) -> _OpenAIResponseObjectWithInputAndMessages:
|
||||
"""
|
||||
Get a response object with automatic access control checking.
|
||||
"""
|
||||
|
@ -205,7 +228,7 @@ class ResponsesStore:
|
|||
# This provides security by not revealing whether the record exists
|
||||
raise ValueError(f"Response with id {response_id} not found") from None
|
||||
|
||||
return OpenAIResponseObjectWithInput(**row["response_object"])
|
||||
return _OpenAIResponseObjectWithInputAndMessages(**row["response_object"])
|
||||
|
||||
async def delete_response_object(self, response_id: str) -> OpenAIDeleteResponseObject:
|
||||
if not self.sql_store:
|
||||
|
@ -241,8 +264,8 @@ class ResponsesStore:
|
|||
if before and after:
|
||||
raise ValueError("Cannot specify both 'before' and 'after' parameters")
|
||||
|
||||
response_with_input = await self.get_response_object(response_id)
|
||||
items = response_with_input.input
|
||||
response_with_input_and_messages = await self.get_response_object(response_id)
|
||||
items = response_with_input_and_messages.input
|
||||
|
||||
if order == Order.desc:
|
||||
items = list(reversed(items))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue