mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-17 02:29:26 +00:00
feat(responses): implement usage tracking in streaming responses
Implementation changes: - Add usage accumulation to StreamingResponseOrchestrator - Enable stream_options to receive usage in streaming chunks - Track usage across multi-turn responses with tool execution - Convert between chat completion and response usage formats - Extract usage accumulation into helper method for clarity Test changes: - Add usage assertions to streaming and non-streaming tests - Update test recordings with actual usage data from OpenAI 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
c92a1c99f0
commit
674140fe89
10 changed files with 3485 additions and 612 deletions
|
|
@ -35,12 +35,16 @@ from llama_stack.apis.agents.openai_responses import (
|
|||
OpenAIResponseOutputMessageFunctionToolCall,
|
||||
OpenAIResponseOutputMessageMCPListTools,
|
||||
OpenAIResponseText,
|
||||
OpenAIResponseUsage,
|
||||
OpenAIResponseUsageInputTokensDetails,
|
||||
OpenAIResponseUsageOutputTokensDetails,
|
||||
WebSearchToolTypes,
|
||||
)
|
||||
from llama_stack.apis.inference import (
|
||||
Inference,
|
||||
OpenAIAssistantMessageParam,
|
||||
OpenAIChatCompletion,
|
||||
OpenAIChatCompletionChunk,
|
||||
OpenAIChatCompletionToolCall,
|
||||
OpenAIChoice,
|
||||
OpenAIMessageParam,
|
||||
|
|
@ -100,6 +104,8 @@ class StreamingResponseOrchestrator:
|
|||
self.final_messages: list[OpenAIMessageParam] = []
|
||||
# mapping for annotations
|
||||
self.citation_files: dict[str, str] = {}
|
||||
# Track accumulated usage across all inference calls
|
||||
self.accumulated_usage: OpenAIResponseUsage | None = None
|
||||
|
||||
async def create_response(self) -> AsyncIterator[OpenAIResponseObjectStream]:
|
||||
# Initialize output messages
|
||||
|
|
@ -137,6 +143,9 @@ class StreamingResponseOrchestrator:
|
|||
stream=True,
|
||||
temperature=self.ctx.temperature,
|
||||
response_format=response_format,
|
||||
stream_options={
|
||||
"include_usage": True,
|
||||
},
|
||||
)
|
||||
|
||||
# Process streaming chunks and build complete response
|
||||
|
|
@ -201,6 +210,7 @@ class StreamingResponseOrchestrator:
|
|||
status="completed",
|
||||
text=self.text,
|
||||
output=output_messages,
|
||||
usage=self.accumulated_usage,
|
||||
)
|
||||
|
||||
# Emit response.completed
|
||||
|
|
@ -243,6 +253,51 @@ class StreamingResponseOrchestrator:
|
|||
|
||||
return function_tool_calls, non_function_tool_calls, approvals, next_turn_messages
|
||||
|
||||
def _accumulate_chunk_usage(self, chunk: OpenAIChatCompletionChunk) -> None:
|
||||
"""Accumulate usage from a streaming chunk into the response usage format."""
|
||||
if not chunk.usage:
|
||||
return
|
||||
|
||||
if self.accumulated_usage is None:
|
||||
# Convert from chat completion format to response format
|
||||
self.accumulated_usage = OpenAIResponseUsage(
|
||||
input_tokens=chunk.usage.prompt_tokens,
|
||||
output_tokens=chunk.usage.completion_tokens,
|
||||
total_tokens=chunk.usage.total_tokens,
|
||||
input_tokens_details=(
|
||||
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
|
||||
if chunk.usage.prompt_tokens_details
|
||||
else None
|
||||
),
|
||||
output_tokens_details=(
|
||||
OpenAIResponseUsageOutputTokensDetails(
|
||||
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
|
||||
)
|
||||
if chunk.usage.completion_tokens_details
|
||||
else None
|
||||
),
|
||||
)
|
||||
else:
|
||||
# Accumulate across multiple inference calls
|
||||
self.accumulated_usage = OpenAIResponseUsage(
|
||||
input_tokens=self.accumulated_usage.input_tokens + chunk.usage.prompt_tokens,
|
||||
output_tokens=self.accumulated_usage.output_tokens + chunk.usage.completion_tokens,
|
||||
total_tokens=self.accumulated_usage.total_tokens + chunk.usage.total_tokens,
|
||||
# Use latest non-null details
|
||||
input_tokens_details=(
|
||||
OpenAIResponseUsageInputTokensDetails(cached_tokens=chunk.usage.prompt_tokens_details.cached_tokens)
|
||||
if chunk.usage.prompt_tokens_details
|
||||
else self.accumulated_usage.input_tokens_details
|
||||
),
|
||||
output_tokens_details=(
|
||||
OpenAIResponseUsageOutputTokensDetails(
|
||||
reasoning_tokens=chunk.usage.completion_tokens_details.reasoning_tokens
|
||||
)
|
||||
if chunk.usage.completion_tokens_details
|
||||
else self.accumulated_usage.output_tokens_details
|
||||
),
|
||||
)
|
||||
|
||||
async def _process_streaming_chunks(
|
||||
self, completion_result, output_messages: list[OpenAIResponseOutput]
|
||||
) -> AsyncIterator[OpenAIResponseObjectStream | ChatCompletionResult]:
|
||||
|
|
@ -266,6 +321,10 @@ class StreamingResponseOrchestrator:
|
|||
chat_response_id = chunk.id
|
||||
chunk_created = chunk.created
|
||||
chunk_model = chunk.model
|
||||
|
||||
# Accumulate usage from chunks (typically in final chunk with stream_options)
|
||||
self._accumulate_chunk_usage(chunk)
|
||||
|
||||
for chunk_choice in chunk.choices:
|
||||
# Emit incremental text content as delta events
|
||||
if chunk_choice.delta.content:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue