use a messages store for conversations, dont store system message

This commit is contained in:
Ashwin Bharambe 2025-10-14 21:36:55 -07:00
parent e3459fd49d
commit 497ccc2c59
2 changed files with 87 additions and 20 deletions

View file

@ -125,17 +125,23 @@ class OpenAIResponsesImpl:
messages = await convert_response_input_to_chat_messages(all_input)
tool_context.recover_tools_from_previous_response(previous_response)
else:
if conversation is not None:
conversation_items = await self.conversations_api.list(conversation, order="asc")
all_input = conversation_items.data
if isinstance(input, str):
all_input.append(OpenAIResponseMessage(role="user", content=input))
elif isinstance(input, list):
all_input.extend(input)
else:
all_input = input
elif conversation is not None:
conversation_items = await self.conversations_api.list(conversation, order="asc")
# Use stored messages as source of truth (like previous_response.messages)
stored_messages = await self.responses_store.get_conversation_messages(conversation)
all_input = input
if not conversation_items.data:
# First turn - just convert the new input
messages = await convert_response_input_to_chat_messages(input)
else:
# Use stored messages directly and convert only new input
messages = stored_messages or []
new_messages = await convert_response_input_to_chat_messages(input, previous_messages=messages)
messages.extend(new_messages)
else:
all_input = input
messages = await convert_response_input_to_chat_messages(all_input)
return all_input, messages, tool_context
@ -356,21 +362,23 @@ class OpenAIResponsesImpl:
# This ensures the storage/syncing happens even if the consumer breaks early
if (
stream_chunk.type in {"response.completed", "response.incomplete"}
and store
and final_response
and failed_response is None
):
# TODO: we really should work off of output_items instead of "final_messages"
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
messages_to_store = list(
filter(lambda x: not isinstance(x, OpenAISystemMessageParam), orchestrator.final_messages)
)
if store:
# TODO: we really should work off of output_items instead of "final_messages"
await self._store_response(
response=final_response,
input=all_input,
messages=messages_to_store,
)
if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response:
# if we are starting the conversation, you will have "input" available
# we need to persist that. all else would be recorded via output_items
await self._sync_response_to_conversation(conversation, input, output_items)
if conversation:
await self._sync_response_to_conversation(conversation, input, output_items)
await self.responses_store.store_conversation_messages(conversation, messages_to_store)
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id)

View file

@ -88,6 +88,14 @@ class ResponsesStore:
},
)
await self.sql_store.create_table(
"conversation_messages",
{
"conversation_id": ColumnDefinition(type=ColumnType.STRING, primary_key=True),
"messages": ColumnType.JSON,
},
)
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
@ -294,3 +302,54 @@ class ResponsesStore:
items = items[:limit]
return ListOpenAIResponseInputItem(data=items)
async def store_conversation_messages(self, conversation_id: str, messages: list[OpenAIMessageParam]) -> None:
"""Store messages for a conversation.
:param conversation_id: The conversation identifier.
:param messages: List of OpenAI message parameters to store.
"""
if not self.sql_store:
raise ValueError("Responses store is not initialized")
# Serialize messages to dict format for JSON storage
messages_data = [msg.model_dump() for msg in messages]
# Upsert: try insert first, update if exists
try:
await self.sql_store.insert(
table="conversation_messages",
data={"conversation_id": conversation_id, "messages": messages_data},
)
except Exception:
# If insert fails due to ID conflict, update existing record
await self.sql_store.update(
table="conversation_messages",
data={"messages": messages_data},
where={"conversation_id": conversation_id},
)
logger.info(f"Stored {len(messages)} messages for conversation {conversation_id}")
async def get_conversation_messages(self, conversation_id: str) -> list[OpenAIMessageParam] | None:
"""Get stored messages for a conversation.
:param conversation_id: The conversation identifier.
:returns: List of OpenAI message parameters, or None if no messages stored.
"""
if not self.sql_store:
raise ValueError("Responses store is not initialized")
record = await self.sql_store.fetch_one(
table="conversation_messages",
where={"conversation_id": conversation_id},
)
if record is None:
return None
# Deserialize messages from JSON storage
from pydantic import TypeAdapter
adapter = TypeAdapter(list[OpenAIMessageParam])
return adapter.validate_python(record["messages"])