fix(responses): sync conversation before yielding terminal events in streaming

Move conversation sync logic before yield to ensure it executes even when
streaming consumers break early after receiving response.completed event.
This commit is contained in:
Ashwin Bharambe 2025-10-22 14:15:53 -07:00
parent cb2185b936
commit 000b8f514f
3 changed files with 11 additions and 4 deletions

View file

@ -372,14 +372,13 @@ class OpenAIResponsesImpl:
final_response = stream_chunk.response final_response = stream_chunk.response
elif stream_chunk.type == "response.failed": elif stream_chunk.type == "response.failed":
failed_response = stream_chunk.response failed_response = stream_chunk.response
yield stream_chunk
if stream_chunk.type == "response.output_item.done": if stream_chunk.type == "response.output_item.done":
item = stream_chunk.item item = stream_chunk.item
output_items.append(item) output_items.append(item)
# Store and sync immediately after yielding terminal events # Store and sync before yielding terminal events
# This ensures the storage/syncing happens even if the consumer breaks early # This ensures the storage/syncing happens even if the consumer breaks after receiving the event
if ( if (
stream_chunk.type in {"response.completed", "response.incomplete"} stream_chunk.type in {"response.completed", "response.incomplete"}
and final_response and final_response
@ -400,6 +399,8 @@ class OpenAIResponsesImpl:
await self._sync_response_to_conversation(conversation, input, output_items) await self._sync_response_to_conversation(conversation, input, output_items)
await self.responses_store.store_conversation_messages(conversation, messages_to_store) await self.responses_store.store_conversation_messages(conversation, messages_to_store)
yield stream_chunk
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject: async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id) return await self.responses_store.delete_response_object(response_id)

View file

@ -43,6 +43,7 @@ def pytest_sessionstart(session):
if "SQLITE_STORE_DIR" not in os.environ: if "SQLITE_STORE_DIR" not in os.environ:
os.environ["SQLITE_STORE_DIR"] = tempfile.mkdtemp() os.environ["SQLITE_STORE_DIR"] = tempfile.mkdtemp()
logger.info(f"Setting SQLITE_STORE_DIR: {os.environ['SQLITE_STORE_DIR']}")
# Set test stack config type for api_recorder test isolation # Set test stack config type for api_recorder test isolation
stack_config = session.config.getoption("--stack-config", default=None) stack_config = session.config.getoption("--stack-config", default=None)

View file

@ -40,7 +40,12 @@ def is_port_available(port: int, host: str = "localhost") -> bool:
def start_llama_stack_server(config_name: str) -> subprocess.Popen: def start_llama_stack_server(config_name: str) -> subprocess.Popen:
"""Start a llama stack server with the given config.""" """Start a llama stack server with the given config."""
cmd = f"uv run llama stack run {config_name}"
# remove server.log if it exists
if os.path.exists("server.log"):
os.remove("server.log")
cmd = f"llama stack run {config_name}"
devnull = open(os.devnull, "w") devnull = open(os.devnull, "w")
process = subprocess.Popen( process = subprocess.Popen(
shlex.split(cmd), shlex.split(cmd),