From 497ccc2c59aef273a265c15f146dc10925d37fc1 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Tue, 14 Oct 2025 21:36:55 -0700 Subject: [PATCH] use a messages store for conversations, dont store system message --- .../responses/openai_responses.py | 48 ++++++++------- .../utils/responses/responses_store.py | 59 +++++++++++++++++++ 2 files changed, 87 insertions(+), 20 deletions(-) diff --git a/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py index ace5d88ee..13b6356a9 100644 --- a/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py +++ b/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py @@ -125,17 +125,23 @@ class OpenAIResponsesImpl: messages = await convert_response_input_to_chat_messages(all_input) tool_context.recover_tools_from_previous_response(previous_response) - else: - if conversation is not None: - conversation_items = await self.conversations_api.list(conversation, order="asc") - all_input = conversation_items.data - if isinstance(input, str): - all_input.append(OpenAIResponseMessage(role="user", content=input)) - elif isinstance(input, list): - all_input.extend(input) - else: - all_input = input + elif conversation is not None: + conversation_items = await self.conversations_api.list(conversation, order="asc") + # Use stored messages as source of truth (like previous_response.messages) + stored_messages = await self.responses_store.get_conversation_messages(conversation) + + all_input = input + if not conversation_items.data: + # First turn - just convert the new input + messages = await convert_response_input_to_chat_messages(input) + else: + # Use stored messages directly and convert only new input + messages = stored_messages or [] + new_messages = await convert_response_input_to_chat_messages(input, previous_messages=messages) + messages.extend(new_messages) + else: + all_input = input messages = await convert_response_input_to_chat_messages(all_input) return all_input, messages, tool_context @@ -356,21 +362,23 @@ class OpenAIResponsesImpl: # This ensures the storage/syncing happens even if the consumer breaks early if ( stream_chunk.type in {"response.completed", "response.incomplete"} - and store and final_response and failed_response is None ): - # TODO: we really should work off of output_items instead of "final_messages" - await self._store_response( - response=final_response, - input=all_input, - messages=orchestrator.final_messages, + messages_to_store = list( + filter(lambda x: not isinstance(x, OpenAISystemMessageParam), orchestrator.final_messages) ) + if store: + # TODO: we really should work off of output_items instead of "final_messages" + await self._store_response( + response=final_response, + input=all_input, + messages=messages_to_store, + ) - if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response: - # if we are starting the conversation, you will have "input" available - # we need to persist that. all else would be recorded via output_items - await self._sync_response_to_conversation(conversation, input, output_items) + if conversation: + await self._sync_response_to_conversation(conversation, input, output_items) + await self.responses_store.store_conversation_messages(conversation, messages_to_store) async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject: return await self.responses_store.delete_response_object(response_id) diff --git a/llama_stack/providers/utils/responses/responses_store.py b/llama_stack/providers/utils/responses/responses_store.py index e610a1ba2..8062e59ed 100644 --- a/llama_stack/providers/utils/responses/responses_store.py +++ b/llama_stack/providers/utils/responses/responses_store.py @@ -88,6 +88,14 @@ class ResponsesStore: }, ) + await self.sql_store.create_table( + "conversation_messages", + { + "conversation_id": ColumnDefinition(type=ColumnType.STRING, primary_key=True), + "messages": ColumnType.JSON, + }, + ) + if self.enable_write_queue: self._queue = asyncio.Queue(maxsize=self._max_write_queue_size) for _ in range(self._num_writers): @@ -294,3 +302,54 @@ class ResponsesStore: items = items[:limit] return ListOpenAIResponseInputItem(data=items) + + async def store_conversation_messages(self, conversation_id: str, messages: list[OpenAIMessageParam]) -> None: + """Store messages for a conversation. + + :param conversation_id: The conversation identifier. + :param messages: List of OpenAI message parameters to store. + """ + if not self.sql_store: + raise ValueError("Responses store is not initialized") + + # Serialize messages to dict format for JSON storage + messages_data = [msg.model_dump() for msg in messages] + + # Upsert: try insert first, update if exists + try: + await self.sql_store.insert( + table="conversation_messages", + data={"conversation_id": conversation_id, "messages": messages_data}, + ) + except Exception: + # If insert fails due to ID conflict, update existing record + await self.sql_store.update( + table="conversation_messages", + data={"messages": messages_data}, + where={"conversation_id": conversation_id}, + ) + + logger.info(f"Stored {len(messages)} messages for conversation {conversation_id}") + + async def get_conversation_messages(self, conversation_id: str) -> list[OpenAIMessageParam] | None: + """Get stored messages for a conversation. + + :param conversation_id: The conversation identifier. + :returns: List of OpenAI message parameters, or None if no messages stored. + """ + if not self.sql_store: + raise ValueError("Responses store is not initialized") + + record = await self.sql_store.fetch_one( + table="conversation_messages", + where={"conversation_id": conversation_id}, + ) + + if record is None: + return None + + # Deserialize messages from JSON storage + from pydantic import TypeAdapter + + adapter = TypeAdapter(list[OpenAIMessageParam]) + return adapter.validate_python(record["messages"])