mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 20:14:13 +00:00
206 lines
8 KiB
Python
206 lines
8 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
import time
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def setup_telemetry_metrics_data(openai_client, client_with_models, text_model_id):
|
|
"""Setup fixture that creates telemetry metrics data before tests run."""
|
|
|
|
# Skip OpenAI tests if running in library mode
|
|
if not hasattr(client_with_models, "base_url"):
|
|
pytest.skip("OpenAI client tests not supported with library client")
|
|
|
|
prompt_tokens = []
|
|
completion_tokens = []
|
|
total_tokens = []
|
|
|
|
# Create OpenAI completions to generate metrics using the proper OpenAI client
|
|
for i in range(5):
|
|
response = openai_client.chat.completions.create(
|
|
model=text_model_id,
|
|
messages=[{"role": "user", "content": f"OpenAI test {i}"}],
|
|
stream=False,
|
|
)
|
|
prompt_tokens.append(response.usage.prompt_tokens)
|
|
completion_tokens.append(response.usage.completion_tokens)
|
|
total_tokens.append(response.usage.total_tokens)
|
|
|
|
# Wait for metrics to be logged
|
|
start_time = time.time()
|
|
while time.time() - start_time < 30:
|
|
try:
|
|
# Try to query metrics to see if they're available
|
|
metrics_response = client_with_models.telemetry.query_metrics(
|
|
metric_name="completion_tokens",
|
|
start_time=int((datetime.now(UTC) - timedelta(minutes=5)).timestamp()),
|
|
)
|
|
if len(metrics_response[0].values) > 0:
|
|
break
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.1)
|
|
|
|
# Return the token lists for use in tests
|
|
return {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_prompt_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data):
|
|
"""Test that prompt_tokens metrics are queryable."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
response = client_with_models.telemetry.query_metrics(
|
|
metric_name="prompt_tokens",
|
|
start_time=start_time,
|
|
)
|
|
|
|
assert isinstance(response, list)
|
|
|
|
assert isinstance(response[0].values, list), "Should return a list of metric series"
|
|
|
|
assert response[0].metric == "prompt_tokens"
|
|
|
|
# Use the actual values from setup instead of hardcoded values
|
|
expected_values = setup_telemetry_metrics_data["prompt_tokens"]
|
|
assert response[0].values[-1].value in expected_values, (
|
|
f"Expected one of {expected_values}, got {response[0].values[-1].value}"
|
|
)
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_completion_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data):
|
|
"""Test that completion_tokens metrics are queryable."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
response = client_with_models.telemetry.query_metrics(
|
|
metric_name="completion_tokens",
|
|
start_time=start_time,
|
|
)
|
|
|
|
assert isinstance(response, list)
|
|
|
|
assert isinstance(response[0].values, list), "Should return a list of metric series"
|
|
|
|
assert response[0].metric == "completion_tokens"
|
|
|
|
# Use the actual values from setup instead of hardcoded values
|
|
expected_values = setup_telemetry_metrics_data["completion_tokens"]
|
|
assert response[0].values[-1].value in expected_values, (
|
|
f"Expected one of {expected_values}, got {response[0].values[-1].value}"
|
|
)
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_total_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data):
|
|
"""Test that total_tokens metrics are queryable."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
response = client_with_models.telemetry.query_metrics(
|
|
metric_name="total_tokens",
|
|
start_time=start_time,
|
|
)
|
|
|
|
assert isinstance(response, list)
|
|
|
|
assert isinstance(response[0].values, list), "Should return a list of metric series"
|
|
|
|
assert response[0].metric == "total_tokens"
|
|
|
|
# Use the actual values from setup instead of hardcoded values
|
|
expected_values = setup_telemetry_metrics_data["total_tokens"]
|
|
assert response[0].values[-1].value in expected_values, (
|
|
f"Expected one of {expected_values}, got {response[0].values[-1].value}"
|
|
)
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_with_time_range(llama_stack_client, text_model_id):
|
|
"""Test that metrics are queryable with time range."""
|
|
end_time = int(datetime.now(UTC).timestamp())
|
|
start_time = end_time - 600 # 10 minutes ago
|
|
|
|
response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="prompt_tokens",
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
)
|
|
|
|
assert isinstance(response, list)
|
|
|
|
assert isinstance(response[0].values, list), "Should return a list of metric series"
|
|
|
|
assert response[0].metric == "prompt_tokens"
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_with_label_matchers(llama_stack_client, text_model_id):
|
|
"""Test that metrics are queryable with label matchers."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="prompt_tokens",
|
|
start_time=start_time,
|
|
label_matchers=[{"name": "model_id", "value": text_model_id, "operator": "="}],
|
|
)
|
|
|
|
assert isinstance(response[0].values, list), "Should return a list of metric series"
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_nonexistent_metric(llama_stack_client):
|
|
"""Test that querying a nonexistent metric returns empty data."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="nonexistent_metric",
|
|
start_time=start_time,
|
|
)
|
|
|
|
assert isinstance(response, list), "Should return an empty list for nonexistent metric"
|
|
assert len(response) == 0
|
|
|
|
|
|
@pytest.mark.skip(reason="Skipping this test until client is regenerated")
|
|
def test_query_metrics_with_granularity(llama_stack_client, text_model_id):
|
|
"""Test that metrics are queryable with different granularity levels."""
|
|
start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp())
|
|
|
|
# Test hourly granularity
|
|
hourly_response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="total_tokens",
|
|
start_time=start_time,
|
|
granularity="1h",
|
|
)
|
|
|
|
# Test daily granularity
|
|
daily_response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="total_tokens",
|
|
start_time=start_time,
|
|
granularity="1d",
|
|
)
|
|
|
|
# Test no granularity (raw data points)
|
|
raw_response = llama_stack_client.telemetry.query_metrics(
|
|
metric_name="total_tokens",
|
|
start_time=start_time,
|
|
granularity=None,
|
|
)
|
|
|
|
# All should return valid data
|
|
assert isinstance(hourly_response[0].values, list), "Hourly granularity should return data"
|
|
assert isinstance(daily_response[0].values, list), "Daily granularity should return data"
|
|
assert isinstance(raw_response[0].values, list), "No granularity should return data"
|
|
|
|
# Verify that different granularities produce different aggregation levels
|
|
# (The exact number depends on data distribution, but they should be queryable)
|
|
assert len(hourly_response[0].values) >= 0, "Hourly granularity should be queryable"
|
|
assert len(daily_response[0].values) >= 0, "Daily granularity should be queryable"
|
|
assert len(raw_response[0].values) >= 0, "No granularity should be queryable"
|