fix(telemetry): remove telemetry tests :(

This commit is contained in:
Emilio Garcia 2025-11-13 15:21:15 -05:00
parent ce92a44d08
commit 350650d18c
8 changed files with 0 additions and 4752 deletions

View file

@ -1,19 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Telemetry collector helpers for integration tests."""
from .base import BaseTelemetryCollector, SpanStub
from .in_memory import InMemoryTelemetryCollector, InMemoryTelemetryManager
from .otlp import OtlpHttpTestCollector
__all__ = [
"BaseTelemetryCollector",
"SpanStub",
"InMemoryTelemetryCollector",
"InMemoryTelemetryManager",
"OtlpHttpTestCollector",
]

View file

@ -1,506 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Shared helpers for telemetry test collectors."""
import os
import time
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any
@dataclass
class MetricStub:
"""Unified metric interface for both in-memory and OTLP collectors."""
name: str
value: Any
attributes: dict[str, Any] | None = None
@dataclass
class SpanStub:
"""Unified span interface for both in-memory and OTLP collectors."""
name: str
attributes: dict[str, Any] | None = None
resource_attributes: dict[str, Any] | None = None
events: list[dict[str, Any]] | None = None
trace_id: str | None = None
span_id: str | None = None
@property
def context(self):
"""Provide context-like interface for trace_id compatibility."""
if self.trace_id is None:
return None
return type("Context", (), {"trace_id": int(self.trace_id, 16)})()
def get_trace_id(self) -> str | None:
"""Get trace ID in hex format.
Tries context.trace_id first, then falls back to direct trace_id.
"""
context = getattr(self, "context", None)
if context and getattr(context, "trace_id", None) is not None:
return f"{context.trace_id:032x}"
return getattr(self, "trace_id", None)
def has_message(self, text: str) -> bool:
"""Check if span contains a specific message in its args."""
if self.attributes is None:
return False
args = self.attributes.get("__args__")
if not args or not isinstance(args, str):
return False
return text in args
def is_root_span(self) -> bool:
"""Check if this is a root span."""
if self.attributes is None:
return False
return self.attributes.get("__root__") is True
def is_autotraced(self) -> bool:
"""Check if this span was automatically traced."""
if self.attributes is None:
return False
return self.attributes.get("__autotraced__") is True
def get_span_type(self) -> str | None:
"""Get the span type (async, sync, async_generator)."""
if self.attributes is None:
return None
return self.attributes.get("__type__")
def get_class_method(self) -> tuple[str | None, str | None]:
"""Get the class and method names for autotraced spans."""
if self.attributes is None:
return None, None
return (self.attributes.get("__class__"), self.attributes.get("__method__"))
def get_location(self) -> str | None:
"""Get the location (library_client, server) for root spans."""
if self.attributes is None:
return None
return self.attributes.get("__location__")
def _value_to_python(value: Any) -> Any:
kind = value.WhichOneof("value")
if kind == "string_value":
return value.string_value
if kind == "int_value":
return value.int_value
if kind == "double_value":
return value.double_value
if kind == "bool_value":
return value.bool_value
if kind == "bytes_value":
return value.bytes_value
if kind == "array_value":
return [_value_to_python(item) for item in value.array_value.values]
if kind == "kvlist_value":
return {kv.key: _value_to_python(kv.value) for kv in value.kvlist_value.values}
return None
def attributes_to_dict(key_values: Iterable[Any]) -> dict[str, Any]:
return {key_value.key: _value_to_python(key_value.value) for key_value in key_values}
def events_to_list(events: Iterable[Any]) -> list[dict[str, Any]]:
return [
{
"name": event.name,
"timestamp": event.time_unix_nano,
"attributes": attributes_to_dict(event.attributes),
}
for event in events
]
class BaseTelemetryCollector:
"""Base class for telemetry collectors that ensures consistent return types.
All collectors must return SpanStub objects to ensure test compatibility
across both library-client and server modes.
"""
# Default delay in seconds if OTEL_METRIC_EXPORT_INTERVAL is not set
_DEFAULT_BASELINE_STABILIZATION_DELAY = 0.2
def __init__(self):
self._metric_baseline: dict[tuple[str, str], float] = {}
@classmethod
def _get_baseline_stabilization_delay(cls) -> float:
"""Get baseline stabilization delay from OTEL_METRIC_EXPORT_INTERVAL.
Adds 1.5x buffer for CI environments.
"""
interval_ms = os.environ.get("OTEL_METRIC_EXPORT_INTERVAL")
if interval_ms:
try:
delay = float(interval_ms) / 1000.0
except (ValueError, TypeError):
delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY
else:
delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY
if os.environ.get("CI"):
delay *= 1.5
return delay
def _get_metric_key(self, metric: MetricStub) -> tuple[str, str]:
"""Generate a stable key for a metric based on name and attributes."""
attrs = metric.attributes or {}
attr_key = ",".join(f"{k}={v}" for k, v in sorted(attrs.items()))
return (metric.name, attr_key)
def _compute_metric_delta(self, metric: MetricStub) -> int | float | None:
"""Compute delta value for a metric from baseline.
Returns:
Delta value if metric was in baseline, absolute value if new, None if unchanged.
"""
metric_key = self._get_metric_key(metric)
if metric_key in self._metric_baseline:
baseline_value = self._metric_baseline[metric_key]
delta = metric.value - baseline_value
return delta if delta > 0 else None
else:
return metric.value
def get_spans(
self,
expected_count: int | None = None,
timeout: float = 5.0,
poll_interval: float = 0.05,
) -> tuple[SpanStub, ...]:
deadline = time.time() + timeout
min_count = expected_count if expected_count is not None else 1
last_len: int | None = None
stable_iterations = 0
while True:
spans = tuple(self._snapshot_spans())
if len(spans) >= min_count:
if expected_count is not None and len(spans) >= expected_count:
return spans
if last_len == len(spans):
stable_iterations += 1
if stable_iterations >= 2:
return spans
else:
stable_iterations = 1
else:
stable_iterations = 0
if time.time() >= deadline:
return spans
last_len = len(spans)
time.sleep(poll_interval)
def get_metrics(
self,
expected_count: int | None = None,
timeout: float = 5.0,
poll_interval: float = 0.05,
expect_model_id: str | None = None,
) -> dict[str, MetricStub]:
"""Poll until expected metrics are available or timeout is reached.
Returns metrics with delta values computed from baseline.
"""
deadline = time.time() + timeout
min_count = expected_count if expected_count is not None else 1
accumulated_metrics = {}
seen_metric_names_with_model_id = set()
while time.time() < deadline:
current_metrics = self._snapshot_metrics()
if current_metrics:
for metric in current_metrics:
delta_value = self._compute_metric_delta(metric)
if delta_value is None:
continue
metric_with_delta = MetricStub(
name=metric.name,
value=delta_value,
attributes=metric.attributes,
)
self._accumulate_metric(
accumulated_metrics,
metric_with_delta,
expect_model_id,
seen_metric_names_with_model_id,
)
if self._has_enough_metrics(
accumulated_metrics, seen_metric_names_with_model_id, min_count, expect_model_id
):
return accumulated_metrics
time.sleep(poll_interval)
return accumulated_metrics
def _accumulate_metric(
self,
accumulated: dict[str, MetricStub],
metric: MetricStub,
expect_model_id: str | None,
seen_with_model_id: set[str],
) -> None:
"""Accumulate a metric, preferring those matching expected model_id."""
metric_name = metric.name
matches_model_id = (
expect_model_id and metric.attributes and metric.attributes.get("model_id") == expect_model_id
)
if metric_name not in accumulated:
accumulated[metric_name] = metric
if matches_model_id:
seen_with_model_id.add(metric_name)
return
existing = accumulated[metric_name]
existing_matches = (
expect_model_id and existing.attributes and existing.attributes.get("model_id") == expect_model_id
)
if matches_model_id and not existing_matches:
accumulated[metric_name] = metric
seen_with_model_id.add(metric_name)
elif matches_model_id == existing_matches:
if metric.value > existing.value:
accumulated[metric_name] = metric
if matches_model_id:
seen_with_model_id.add(metric_name)
def _has_enough_metrics(
self,
accumulated: dict[str, MetricStub],
seen_with_model_id: set[str],
min_count: int,
expect_model_id: str | None,
) -> bool:
"""Check if we have collected enough metrics."""
if len(accumulated) < min_count:
return False
if not expect_model_id:
return True
return len(seen_with_model_id) >= min_count
@staticmethod
def _convert_attributes_to_dict(attrs: Any) -> dict[str, Any]:
"""Convert various attribute types to a consistent dictionary format.
Handles mappingproxy, dict, and other attribute types.
"""
if attrs is None:
return {}
try:
return dict(attrs.items()) # type: ignore[attr-defined]
except AttributeError:
try:
return dict(attrs)
except TypeError:
return dict(attrs) if attrs else {}
@staticmethod
def _extract_trace_span_ids(span: Any) -> tuple[str | None, str | None]:
"""Extract trace_id and span_id from OpenTelemetry span object.
Handles both context-based and direct attribute access.
"""
trace_id = None
span_id = None
context = getattr(span, "context", None)
if context:
trace_id = f"{context.trace_id:032x}"
span_id = f"{context.span_id:016x}"
else:
trace_id = getattr(span, "trace_id", None)
span_id = getattr(span, "span_id", None)
return trace_id, span_id
@staticmethod
def _create_span_stub_from_opentelemetry(span: Any) -> SpanStub:
"""Create SpanStub from OpenTelemetry span object.
This helper reduces code duplication between collectors.
"""
trace_id, span_id = BaseTelemetryCollector._extract_trace_span_ids(span)
attributes = BaseTelemetryCollector._convert_attributes_to_dict(span.attributes) or {}
return SpanStub(
name=span.name,
attributes=attributes,
trace_id=trace_id,
span_id=span_id,
)
@staticmethod
def _create_span_stub_from_protobuf(span: Any, resource_attrs: dict[str, Any] | None = None) -> SpanStub:
"""Create SpanStub from protobuf span object.
This helper handles the different structure of protobuf spans.
"""
attributes = attributes_to_dict(span.attributes) or {}
events = events_to_list(span.events) if span.events else None
trace_id = span.trace_id.hex() if span.trace_id else None
span_id = span.span_id.hex() if span.span_id else None
return SpanStub(
name=span.name,
attributes=attributes,
resource_attributes=resource_attrs,
events=events,
trace_id=trace_id,
span_id=span_id,
)
@staticmethod
def _extract_metric_from_opentelemetry(metric: Any) -> MetricStub | None:
"""Extract MetricStub from OpenTelemetry metric object.
This helper reduces code duplication between collectors.
"""
if not (hasattr(metric, "name") and hasattr(metric, "data") and hasattr(metric.data, "data_points")):
return None
if not (metric.data.data_points and len(metric.data.data_points) > 0):
return None
data_point = metric.data.data_points[0]
if hasattr(data_point, "value"):
# Counter or Gauge
value = data_point.value
elif hasattr(data_point, "sum"):
# Histogram - use the sum of all recorded values
value = data_point.sum
else:
return None
attributes = {}
if hasattr(data_point, "attributes"):
attrs = data_point.attributes
if attrs is not None and hasattr(attrs, "items"):
attributes = dict(attrs.items())
elif attrs is not None and not isinstance(attrs, dict):
attributes = dict(attrs)
return MetricStub(
name=metric.name,
value=value,
attributes=attributes or {},
)
@staticmethod
def _create_metric_stubs_from_protobuf(metric: Any) -> list[MetricStub]:
"""Create list of MetricStub objects from protobuf metric object.
Protobuf metrics can have sum, gauge, or histogram data. Each metric can have
multiple data points with different attributes, so we return one MetricStub
per data point.
Returns:
List of MetricStub objects, one per data point in the metric.
"""
if not hasattr(metric, "name"):
return []
metric_stubs = []
for metric_type in ["sum", "gauge", "histogram"]:
if not hasattr(metric, metric_type):
continue
metric_data = getattr(metric, metric_type)
if not metric_data or not hasattr(metric_data, "data_points"):
continue
data_points = metric_data.data_points
if not data_points:
continue
for data_point in data_points:
attributes = attributes_to_dict(data_point.attributes) if hasattr(data_point, "attributes") else {}
value = BaseTelemetryCollector._extract_data_point_value(data_point, metric_type)
if value is None:
continue
metric_stubs.append(
MetricStub(
name=metric.name,
value=value,
attributes=attributes,
)
)
# Only process one metric type per metric
break
return metric_stubs
@staticmethod
def _extract_data_point_value(data_point: Any, metric_type: str) -> float | int | None:
"""Extract value from a protobuf metric data point based on metric type."""
if metric_type == "sum":
if hasattr(data_point, "as_int"):
return data_point.as_int
if hasattr(data_point, "as_double"):
return data_point.as_double
elif metric_type == "gauge":
if hasattr(data_point, "as_double"):
return data_point.as_double
elif metric_type == "histogram":
# Histograms use sum field which represents cumulative sum of all recorded values
if hasattr(data_point, "sum"):
return data_point.sum
return None
def clear(self) -> None:
"""Clear telemetry data and establish baseline for metric delta computation."""
self._metric_baseline.clear()
self._clear_impl()
delay = self._get_baseline_stabilization_delay()
time.sleep(delay)
baseline_metrics = self._snapshot_metrics()
if baseline_metrics:
for metric in baseline_metrics:
metric_key = self._get_metric_key(metric)
self._metric_baseline[metric_key] = metric.value
def _snapshot_spans(self) -> tuple[SpanStub, ...]: # pragma: no cover - interface hook
raise NotImplementedError
def _snapshot_metrics(self) -> tuple[MetricStub, ...] | None: # pragma: no cover - interface hook
raise NotImplementedError
def _clear_impl(self) -> None: # pragma: no cover - interface hook
raise NotImplementedError
def shutdown(self) -> None:
"""Optional hook for subclasses with background workers."""

View file

@ -1,87 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""In-memory telemetry collector for library-client tests."""
import opentelemetry.metrics as otel_metrics
import opentelemetry.trace as otel_trace
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
import llama_stack.core.telemetry.telemetry as telemetry_module
from .base import BaseTelemetryCollector, MetricStub, SpanStub
class InMemoryTelemetryCollector(BaseTelemetryCollector):
"""In-memory telemetry collector for library-client tests.
Converts OpenTelemetry span objects to SpanStub objects to ensure
consistent interface with OTLP collector used in server mode.
"""
def __init__(self, span_exporter: InMemorySpanExporter, metric_reader: InMemoryMetricReader) -> None:
super().__init__()
self._span_exporter = span_exporter
self._metric_reader = metric_reader
def _snapshot_spans(self) -> tuple[SpanStub, ...]:
spans = []
for span in self._span_exporter.get_finished_spans():
spans.append(self._create_span_stub_from_opentelemetry(span))
return tuple(spans)
def _snapshot_metrics(self) -> tuple[MetricStub, ...] | None:
data = self._metric_reader.get_metrics_data()
if not data or not data.resource_metrics:
return None
metric_stubs = []
for resource_metric in data.resource_metrics:
if resource_metric.scope_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
metric_stub = self._extract_metric_from_opentelemetry(metric)
if metric_stub:
metric_stubs.append(metric_stub)
return tuple(metric_stubs) if metric_stubs else None
def _clear_impl(self) -> None:
self._span_exporter.clear()
self._metric_reader.get_metrics_data()
class InMemoryTelemetryManager:
def __init__(self) -> None:
if hasattr(otel_trace, "_TRACER_PROVIDER_SET_ONCE"):
otel_trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
if hasattr(otel_metrics, "_METER_PROVIDER_SET_ONCE"):
otel_metrics._METER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
span_exporter = InMemorySpanExporter()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
trace.set_tracer_provider(tracer_provider)
metric_reader = InMemoryMetricReader()
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
telemetry_module._TRACER_PROVIDER = tracer_provider
self.collector = InMemoryTelemetryCollector(span_exporter, metric_reader)
self._tracer_provider = tracer_provider
self._meter_provider = meter_provider
def shutdown(self) -> None:
telemetry_module._TRACER_PROVIDER = None
self._tracer_provider.shutdown()
self._meter_provider.shutdown()

View file

@ -1,146 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""OTLP HTTP telemetry collector used for server-mode tests."""
import gzip
import os
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest
from .base import BaseTelemetryCollector, MetricStub, SpanStub, attributes_to_dict
class OtlpHttpTestCollector(BaseTelemetryCollector):
def __init__(self) -> None:
super().__init__()
self._spans: list[SpanStub] = []
self._metrics: list[MetricStub] = []
self._lock = threading.Lock()
class _ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
allow_reuse_address = True
configured_port = int(os.environ.get("LLAMA_STACK_TEST_COLLECTOR_PORT", "0"))
self._server = _ThreadingHTTPServer(("127.0.0.1", configured_port), _CollectorHandler)
self._server.collector = self # type: ignore[attr-defined]
port = self._server.server_address[1]
self.endpoint = f"http://127.0.0.1:{port}"
self._thread = threading.Thread(target=self._server.serve_forever, name="otel-test-collector", daemon=True)
self._thread.start()
def _handle_traces(self, request: ExportTraceServiceRequest) -> None:
new_spans: list[SpanStub] = []
for resource_spans in request.resource_spans:
resource_attrs = attributes_to_dict(resource_spans.resource.attributes)
for scope_spans in resource_spans.scope_spans:
for span in scope_spans.spans:
new_spans.append(self._create_span_stub_from_protobuf(span, resource_attrs or None))
if not new_spans:
return
with self._lock:
self._spans.extend(new_spans)
def _handle_metrics(self, request: ExportMetricsServiceRequest) -> None:
new_metrics: list[MetricStub] = []
for resource_metrics in request.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
# Handle multiple data points per metric (e.g., different attribute sets)
metric_stubs = self._create_metric_stubs_from_protobuf(metric)
new_metrics.extend(metric_stubs)
if not new_metrics:
return
with self._lock:
self._metrics.extend(new_metrics)
def _snapshot_spans(self) -> tuple[SpanStub, ...]:
with self._lock:
return tuple(self._spans)
def _snapshot_metrics(self) -> tuple[MetricStub, ...] | None:
with self._lock:
return tuple(self._metrics) if self._metrics else None
def _clear_impl(self) -> None:
"""Clear telemetry over a period of time to prevent race conditions between tests."""
with self._lock:
self._spans.clear()
self._metrics.clear()
# Prevent race conditions where telemetry arrives after clear() but before
# the test starts, causing contamination between tests
deadline = time.time() + 2.0 # Maximum wait time
last_span_count = 0
last_metric_count = 0
stable_iterations = 0
while time.time() < deadline:
with self._lock:
current_span_count = len(self._spans)
current_metric_count = len(self._metrics)
if current_span_count == last_span_count and current_metric_count == last_metric_count:
stable_iterations += 1
if stable_iterations >= 4: # 4 * 50ms = 200ms of stability
break
else:
stable_iterations = 0
last_span_count = current_span_count
last_metric_count = current_metric_count
time.sleep(0.05)
# Final clear to remove any telemetry that arrived during stabilization
with self._lock:
self._spans.clear()
self._metrics.clear()
def shutdown(self) -> None:
self._server.shutdown()
self._server.server_close()
self._thread.join(timeout=1)
class _CollectorHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None: # noqa: N802 Function name `do_POST` should be lowercase
collector: OtlpHttpTestCollector = self.server.collector # type: ignore[attr-defined]
length = int(self.headers.get("content-length", "0"))
body = self.rfile.read(length)
if self.headers.get("content-encoding") == "gzip":
body = gzip.decompress(body)
if self.path == "/v1/traces":
request = ExportTraceServiceRequest()
request.ParseFromString(body)
collector._handle_traces(request)
self._respond_ok()
elif self.path == "/v1/metrics":
request = ExportMetricsServiceRequest()
request.ParseFromString(body)
collector._handle_metrics(request)
self._respond_ok()
else:
self.send_response(404)
self.end_headers()
def _respond_ok(self) -> None:
self.send_response(200)
self.end_headers()

View file

@ -1,66 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Telemetry test configuration supporting both library and server test modes."""
import os
import pytest
from llama_stack.testing.api_recorder import patch_httpx_for_test_id
from tests.integration.fixtures.common import instantiate_llama_stack_client
from tests.integration.telemetry.collectors import InMemoryTelemetryManager, OtlpHttpTestCollector
@pytest.fixture(scope="session")
def telemetry_test_collector():
stack_mode = os.environ.get("LLAMA_STACK_TEST_STACK_CONFIG_TYPE", "library_client")
if stack_mode == "server":
# In server mode, the collector must be started and the server is already running.
# The integration test script (scripts/integration-tests.sh) should have set
# LLAMA_STACK_TEST_COLLECTOR_PORT and OTEL_EXPORTER_OTLP_ENDPOINT before starting the server.
try:
collector = OtlpHttpTestCollector()
except RuntimeError as exc:
pytest.skip(str(exc))
# Verify the collector is listening on the expected endpoint
expected_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
if expected_endpoint and collector.endpoint != expected_endpoint:
pytest.skip(
f"Collector endpoint mismatch: expected {expected_endpoint}, got {collector.endpoint}. "
"Server was likely started before collector."
)
try:
yield collector
finally:
collector.shutdown()
else:
manager = InMemoryTelemetryManager()
try:
yield manager.collector
finally:
manager.shutdown()
@pytest.fixture(scope="session")
def llama_stack_client(telemetry_test_collector, request):
"""Ensure telemetry collector is ready before initializing the stack client."""
patch_httpx_for_test_id()
client = instantiate_llama_stack_client(request.session)
return client
@pytest.fixture
def mock_otlp_collector(telemetry_test_collector):
"""Provides access to telemetry data and clears between tests."""
telemetry_test_collector.clear()
try:
yield telemetry_test_collector
finally:
telemetry_test_collector.clear()

View file

@ -1,60 +0,0 @@
{
"test_id": "tests/integration/telemetry/test_completions.py::test_telemetry_format_completeness[txt=ollama/llama3.2:3b-instruct-fp16]",
"request": {
"method": "POST",
"url": "http://0.0.0.0:11434/v1/v1/chat/completions",
"headers": {},
"body": {
"model": "llama3.2:3b-instruct-fp16",
"messages": [
{
"role": "user",
"content": "Test trace openai with temperature 0.7"
}
],
"max_tokens": 100,
"stream": false,
"temperature": 0.7
},
"endpoint": "/v1/chat/completions",
"model": "llama3.2:3b-instruct-fp16"
},
"response": {
"body": {
"__type__": "openai.types.chat.chat_completion.ChatCompletion",
"__data__": {
"id": "rec-1fcfd86d8111",
"choices": [
{
"finish_reason": "length",
"index": 0,
"logprobs": null,
"message": {
"content": "I can guide you through the process, but please note that testing OpenAI's models requires a bit of setup and potential API limits.\n\nOpenAI's API is designed to be used in a controlled environment for research purposes. You need an account on their platform and access to the API key.\n\nHere's how you can test a model with temperature 0.7 using Python:\n\n```python\nimport numpy as np\n\n# Replace 'YOUR_API_KEY' with your actual OpenAI API key.\nAPI_KEY",
"refusal": null,
"role": "assistant",
"annotations": null,
"audio": null,
"function_call": null,
"tool_calls": null
}
}
],
"created": 0,
"model": "llama3.2:3b-instruct-fp16",
"object": "chat.completion",
"service_tier": null,
"system_fingerprint": "fp_ollama",
"usage": {
"completion_tokens": 100,
"prompt_tokens": 35,
"total_tokens": 135,
"completion_tokens_details": null,
"prompt_tokens_details": null
}
}
},
"is_streaming": false
},
"id_normalization_mapping": {}
}

View file

@ -1,150 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Telemetry tests verifying @trace_protocol decorator format across stack modes.
Note: The mock_otlp_collector fixture automatically clears telemetry data
before and after each test, ensuring test isolation.
"""
import json
import pytest
def test_streaming_chunk_count(mock_otlp_collector, llama_stack_client, text_model_id):
"""Verify streaming adds chunk_count and __type__=async_generator."""
pytest.skip("Disabled: See https://github.com/llamastack/llama-stack/issues/4089")
stream = llama_stack_client.chat.completions.create(
model=text_model_id,
messages=[{"role": "user", "content": "Test trace openai 1"}],
stream=True,
)
chunks = list(stream)
assert len(chunks) > 0
spans = mock_otlp_collector.get_spans(expected_count=5)
assert len(spans) > 0
async_generator_span = next(
(
span
for span in reversed(spans)
if span.get_span_type() == "async_generator"
and span.attributes.get("chunk_count")
and span.has_message("Test trace openai 1")
),
None,
)
assert async_generator_span is not None
raw_chunk_count = async_generator_span.attributes.get("chunk_count")
assert raw_chunk_count is not None
chunk_count = int(raw_chunk_count)
assert chunk_count == len(chunks)
def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client, text_model_id):
"""Comprehensive validation of telemetry data format including spans and metrics."""
pytest.skip("Disabled: See https://github.com/llamastack/llama-stack/issues/4089")
response = llama_stack_client.chat.completions.create(
model=text_model_id,
messages=[{"role": "user", "content": "Test trace openai with temperature 0.7"}],
temperature=0.7,
max_tokens=100,
stream=False,
)
# Handle both dict and Pydantic model for usage
# This occurs due to the replay system returning a dict for usage, but the client returning a Pydantic model
# TODO: Fix this by making the replay system return a Pydantic model for usage
usage = response.usage if isinstance(response.usage, dict) else response.usage.model_dump()
assert usage.get("prompt_tokens") and usage["prompt_tokens"] > 0
assert usage.get("completion_tokens") and usage["completion_tokens"] > 0
assert usage.get("total_tokens") and usage["total_tokens"] > 0
# Verify spans
spans = mock_otlp_collector.get_spans(expected_count=7)
target_span = next(
(span for span in reversed(spans) if span.has_message("Test trace openai with temperature 0.7")),
None,
)
assert target_span is not None
trace_id = target_span.get_trace_id()
assert trace_id is not None
spans = [span for span in spans if span.get_trace_id() == trace_id]
spans = [span for span in spans if span.is_root_span() or span.is_autotraced()]
assert len(spans) >= 4
# Collect all model_ids found in spans
logged_model_ids = []
for span in spans:
attrs = span.attributes
assert attrs is not None
# Root span is created manually by tracing middleware, not by @trace_protocol decorator
if span.is_root_span():
assert span.get_location() in ["library_client", "server"]
continue
assert span.is_autotraced()
class_name, method_name = span.get_class_method()
assert class_name and method_name
assert span.get_span_type() in ["async", "sync", "async_generator"]
args_field = span.attributes.get("__args__")
if args_field:
args = json.loads(args_field)
if "model_id" in args:
logged_model_ids.append(args["model_id"])
# At least one span should capture the fully qualified model ID
assert text_model_id in logged_model_ids, f"Expected to find {text_model_id} in spans, but got {logged_model_ids}"
# Verify token usage metrics in response using polling
expected_metrics = ["completion_tokens", "total_tokens", "prompt_tokens"]
metrics = mock_otlp_collector.get_metrics(expected_count=len(expected_metrics), expect_model_id=text_model_id)
assert len(metrics) > 0, "No metrics found within timeout"
# Filter metrics to only those from the specific model used in the request
# Multiple metrics with the same name can exist (e.g., from safety models)
inference_model_metrics = {}
all_model_ids = set()
for name, metric in metrics.items():
if name in expected_metrics:
model_id = metric.attributes.get("model_id")
all_model_ids.add(model_id)
# Only include metrics from the specific model used in the test request
if model_id == text_model_id:
inference_model_metrics[name] = metric
# Verify expected metrics are present for our specific model
for metric_name in expected_metrics:
assert metric_name in inference_model_metrics, (
f"Expected metric {metric_name} for model {text_model_id} not found. "
f"Available models: {sorted(all_model_ids)}, "
f"Available metrics for {text_model_id}: {list(inference_model_metrics.keys())}"
)
# Verify metric values match usage data
assert inference_model_metrics["completion_tokens"].value == usage["completion_tokens"], (
f"Expected {usage['completion_tokens']} for completion_tokens, but got {inference_model_metrics['completion_tokens'].value}"
)
assert inference_model_metrics["total_tokens"].value == usage["total_tokens"], (
f"Expected {usage['total_tokens']} for total_tokens, but got {inference_model_metrics['total_tokens'].value}"
)
assert inference_model_metrics["prompt_tokens"].value == usage["prompt_tokens"], (
f"Expected {usage['prompt_tokens']} for prompt_tokens, but got {inference_model_metrics['prompt_tokens'].value}"
)