llama-stack-mirror/tests/integration/telemetry/test_openai_telemetry.py
Charlie Doern 0caef40e0d
fix: telemetry fixes (inference and core telemetry) (#2733)
# What does this PR do?

I found a few issues while adding new metrics for various APIs:

currently metrics are only propagated in `chat_completion` and
`completion`

since most providers use the `openai_..` routes as the default in
`llama-stack-client inference chat-completion`, metrics are currently
not working as expected.

in order to get them working the following had to be done:

1. get the completion as usual
2. use new `openai_` versions of the metric gathering functions which
use `.usage` from the `OpenAI..` response types to gather the metrics
which are already populated.
3. define a `stream_generator` which counts the tokens and computes the
metrics (only for stream=True)
5. add metrics to response


NOTE: I could not add metrics to `openai_completion` where stream=True
because that ONLY returns an `OpenAICompletion` not an AsyncGenerator
that we can manipulate.


acquire the lock, and add event to the span as the other `_log_...`
methods do

some new output:

`llama-stack-client inference chat-completion --message hi`

<img width="2416" height="425" alt="Screenshot 2025-07-16 at 8 28 20 AM"
src="https://github.com/user-attachments/assets/ccdf1643-a184-4ddd-9641-d426c4d51326"
/>


and in the client:

<img width="763" height="319" alt="Screenshot 2025-07-16 at 8 28 32 AM"
src="https://github.com/user-attachments/assets/6bceb811-5201-47e9-9e16-8130f0d60007"
/>

these were not previously being recorded nor were they being printed to
the server due to the improper console sink handling

---------

Signed-off-by: Charlie Doern <cdoern@redhat.com>
2025-08-06 13:37:40 -07:00

195 lines
8.3 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import time
from datetime import UTC, datetime
import pytest
@pytest.fixture(scope="module", autouse=True)
def setup_openai_telemetry_data(llama_stack_client, text_model_id):
"""Setup fixture that creates telemetry data specifically for OpenAI completions testing."""
# Create OpenAI completion traces
for i in range(3):
llama_stack_client.chat.completions.create(
model=text_model_id,
messages=[
{
"role": "user",
"content": f"Test trace openai {i}",
}
],
# stream=False to always capture Metrics.
stream=False,
)
# Create additional OpenAI completion traces with different parameters
for i in range(2):
llama_stack_client.chat.completions.create(
model=text_model_id,
messages=[
{
"role": "user",
"content": f"Test trace openai with temperature {i}",
}
],
temperature=0.7,
max_tokens=100,
stream=False,
)
start_time = time.time()
while time.time() - start_time < 30:
traces = llama_stack_client.telemetry.query_traces(limit=10)
if len(traces) >= 5: # 5 OpenAI completion traces
break
time.sleep(1)
if len(traces) < 5:
pytest.fail(
f"Failed to create sufficient OpenAI completion telemetry data after 30s. Got {len(traces)} traces."
)
# Wait for 5 seconds to ensure traces has completed logging
time.sleep(5)
yield
def test_openai_traces_basic(llama_stack_client):
"""Test basic trace querying functionality for OpenAI completions."""
all_traces = llama_stack_client.telemetry.query_traces(limit=10)
assert isinstance(all_traces, list), "Should return a list of traces"
assert len(all_traces) >= 5, "Should have at least 5 traces from OpenAI setup"
# Verify trace structure and data quality
first_trace = all_traces[0]
assert hasattr(first_trace, "trace_id"), "Trace should have trace_id"
assert hasattr(first_trace, "start_time"), "Trace should have start_time"
assert hasattr(first_trace, "root_span_id"), "Trace should have root_span_id"
# Validate trace_id is a valid UUID format
assert isinstance(first_trace.trace_id, str) and len(first_trace.trace_id) > 0, (
"trace_id should be non-empty string"
)
# Validate start_time format and not in the future
now = datetime.now(UTC)
if isinstance(first_trace.start_time, str):
trace_time = datetime.fromisoformat(first_trace.start_time.replace("Z", "+00:00"))
else:
# start_time is already a datetime object
trace_time = first_trace.start_time
if trace_time.tzinfo is None:
trace_time = trace_time.replace(tzinfo=UTC)
# Ensure trace time is not in the future
time_diff = (now - trace_time).total_seconds()
assert time_diff >= 0, f"Trace start_time should not be in the future, got {time_diff}s"
# Validate root_span_id exists and is non-empty
assert isinstance(first_trace.root_span_id, str) and len(first_trace.root_span_id) > 0, (
"root_span_id should be non-empty string"
)
# Test querying specific trace by ID
specific_trace = llama_stack_client.telemetry.get_trace(trace_id=first_trace.trace_id)
assert specific_trace.trace_id == first_trace.trace_id, "Retrieved trace should match requested ID"
assert specific_trace.start_time == first_trace.start_time, "Retrieved trace should have same start_time"
assert specific_trace.root_span_id == first_trace.root_span_id, "Retrieved trace should have same root_span_id"
# Test pagination with proper validation
recent_traces = llama_stack_client.telemetry.query_traces(limit=3, offset=0)
assert len(recent_traces) <= 3, "Should return at most 3 traces when limit=3"
assert len(recent_traces) >= 1, "Should return at least 1 trace"
# Verify all traces have required fields
for trace in recent_traces:
assert hasattr(trace, "trace_id") and trace.trace_id, "Each trace should have non-empty trace_id"
assert hasattr(trace, "start_time") and trace.start_time, "Each trace should have non-empty start_time"
assert hasattr(trace, "root_span_id") and trace.root_span_id, "Each trace should have non-empty root_span_id"
def test_openai_spans_basic(llama_stack_client):
"""Test basic span querying functionality for OpenAI completions."""
spans = llama_stack_client.telemetry.query_spans(attribute_filters=[], attributes_to_return=[])
assert isinstance(spans, list), "Should return a list of spans"
assert len(spans) >= 1, "Should have at least one span from OpenAI setup"
# Verify span structure and data quality
first_span = spans[0]
required_attrs = ["span_id", "name", "trace_id"]
for attr in required_attrs:
assert hasattr(first_span, attr), f"Span should have {attr} attribute"
assert getattr(first_span, attr), f"Span {attr} should not be empty"
# Validate span data types and content
assert isinstance(first_span.span_id, str) and len(first_span.span_id) > 0, "span_id should be non-empty string"
assert isinstance(first_span.name, str) and len(first_span.name) > 0, "span name should be non-empty string"
assert isinstance(first_span.trace_id, str) and len(first_span.trace_id) > 0, "trace_id should be non-empty string"
# Verify span belongs to a valid trace
all_traces = llama_stack_client.telemetry.query_traces(limit=10)
trace_ids = {t.trace_id for t in all_traces}
if first_span.trace_id in trace_ids:
trace = llama_stack_client.telemetry.get_trace(trace_id=first_span.trace_id)
assert trace is not None, "Should be able to retrieve trace for valid trace_id"
assert trace.trace_id == first_span.trace_id, "Trace ID should match span's trace_id"
# Test with span filtering and validate results
filtered_spans = llama_stack_client.telemetry.query_spans(
attribute_filters=[{"key": "name", "op": "eq", "value": first_span.name}],
attributes_to_return=["name", "span_id"],
)
assert isinstance(filtered_spans, list), "Should return a list with span name filter"
# Validate filtered spans if filtering works
if len(filtered_spans) > 0:
for span in filtered_spans:
assert hasattr(span, "name"), "Filtered spans should have name attribute"
assert hasattr(span, "span_id"), "Filtered spans should have span_id attribute"
assert span.name == first_span.name, "Filtered spans should match the filter criteria"
assert isinstance(span.span_id, str) and len(span.span_id) > 0, "Filtered span_id should be valid"
# Test that all spans have consistent structure
for span in spans:
for attr in required_attrs:
assert hasattr(span, attr) and getattr(span, attr), f"All spans should have non-empty {attr}"
def test_openai_completion_creates_telemetry(llama_stack_client, text_model_id):
"""Test that making OpenAI completion calls actually creates telemetry data."""
# Get initial trace count
initial_traces = llama_stack_client.telemetry.query_traces(limit=20)
initial_count = len(initial_traces)
# Make a new OpenAI completion call
response = llama_stack_client.chat.completions.create(
model=text_model_id,
messages=[{"role": "user", "content": "Test OpenAI telemetry creation"}],
stream=False,
)
# Verify we got a response
assert response is not None, "Should get a response from OpenAI completion"
assert hasattr(response, "choices"), "Response should have choices"
assert len(response.choices) > 0, "Response should have at least one choice"
# Wait for telemetry to be recorded
time.sleep(3)
# Check that we have more traces now
final_traces = llama_stack_client.telemetry.query_traces(limit=20)
final_count = len(final_traces)
# Should have at least as many traces as before (might have more due to other activity)
assert final_count >= initial_count, "Should have at least as many traces after OpenAI call"