feat(responses): implement usage tracking in streaming responses (#3771)

Implementats usage accumulation to StreamingResponseOrchestrator. 

The most important part was to pass `stream_options = { "include_usage":
true }` to the chat_completion call. This means I will have to record
all responses tests again because request hash will change :)

Test changes:
- Add usage assertions to streaming and non-streaming tests
- Update test recordings with actual usage data from OpenAI
This commit is contained in:
Ashwin Bharambe 2025-10-10 12:27:03 -07:00 committed by GitHub
parent e7d21e1ee3
commit 1394403360
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 15099 additions and 612 deletions

View file

@ -39,12 +39,16 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseOutputMessageFunctionToolCall,
OpenAIResponseOutputMessageMCPListTools,
OpenAIResponseText,
OpenAIResponseUsage,
OpenAIResponseUsageInputTokensDetails,
OpenAIResponseUsageOutputTokensDetails,
WebSearchToolTypes,
)
from llama_stack.apis.inference import (
Inference,
OpenAIAssistantMessageParam,
OpenAIChatCompletion,
OpenAIChatCompletionChunk,
OpenAIChatCompletionToolCall,
OpenAIChoice,
OpenAIMessageParam,
@ -104,6 +108,8 @@ class StreamingResponseOrchestrator:
self.final_messages: list[OpenAIMessageParam] = []
# mapping for annotations
self.citation_files: dict[str, str] = {}
# Track accumulated usage across all inference calls
self.accumulated_usage: OpenAIResponseUsage | None = None
def _clone_outputs(self, outputs: list[OpenAIResponseOutput]) -> list[OpenAIResponseOutput]:
cloned: list[OpenAIResponseOutput] = []
@ -131,6 +137,7 @@ class StreamingResponseOrchestrator:
text=self.text,
tools=self.ctx.available_tools(),
error=error,
usage=self.accumulated_usage,
)
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
@ -168,6 +175,9 @@ class StreamingResponseOrchestrator:
stream=True,
temperature=self.ctx.temperature,
response_format=response_format,
stream_options={
"include_usage": True,
},
)
# Process streaming chunks and build complete response
@ -298,6 +308,51 @@ class StreamingResponseOrchestrator:
return function_tool_calls, non_function_tool_calls, approvals, next_turn_messages
def _accumulate_chunk_usage(self, chunk: OpenAIChatCompletionChunk) -> None:
"""Accumulate usage from a streaming chunk into the response usage format."""
if not chunk.usage:
return
if self.accumulated_usage is None:
# Convert from chat completion format to response format
self.accumulated_usage = OpenAIResponseUsage(
input_tokens=chunk.usage.prompt_tokens,
output_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
input_tokens_details=(
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
if chunk.usage.prompt_tokens_details
else None
),
output_tokens_details=(
OpenAIResponseUsageOutputTokensDetails(
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
)
if chunk.usage.completion_tokens_details
else None
),
)
else:
# Accumulate across multiple inference calls
self.accumulated_usage = OpenAIResponseUsage(
input_tokens=self.accumulated_usage.input_tokens + chunk.usage.prompt_tokens,
output_tokens=self.accumulated_usage.output_tokens + chunk.usage.completion_tokens,
total_tokens=self.accumulated_usage.total_tokens + chunk.usage.total_tokens,
# Use latest non-null details
input_tokens_details=(
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
if chunk.usage.prompt_tokens_details
else self.accumulated_usage.input_tokens_details
),
output_tokens_details=(
OpenAIResponseUsageOutputTokensDetails(
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
)
if chunk.usage.completion_tokens_details
else self.accumulated_usage.output_tokens_details
),
)
async def _process_streaming_chunks(
self, completion_result, output_messages: list[OpenAIResponseOutput]
) -> AsyncIterator[OpenAIResponseObjectStream | ChatCompletionResult]:
@ -323,6 +378,10 @@ class StreamingResponseOrchestrator:
chat_response_id = chunk.id
chunk_created = chunk.created
chunk_model = chunk.model
# Accumulate usage from chunks (typically in final chunk with stream_options)
self._accumulate_chunk_usage(chunk)
for chunk_choice in chunk.choices:
# Emit incremental text content as delta events
if chunk_choice.delta.content: