diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 7a993b891..014b800cc 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -79,7 +79,6 @@ class TelemetryAdapter(Telemetry): metrics.set_meter_provider(metric_provider) self.meter = metrics.get_meter(__name__) - self._lock = _global_lock async def initialize(self) -> None: diff --git a/tests/integration/telemetry/conftest.py b/tests/integration/telemetry/conftest.py index bed6dc751..701ab5c5e 100644 --- a/tests/integration/telemetry/conftest.py +++ b/tests/integration/telemetry/conftest.py @@ -13,6 +13,7 @@ cannot access spans from a separate server process. from typing import Any +import opentelemetry.metrics as otel_metrics import opentelemetry.trace as otel_trace import pytest from opentelemetry import metrics, trace @@ -25,40 +26,61 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanE import llama_stack.providers.inline.telemetry.meta_reference.telemetry as telemetry_module -class OtelTestCollector: - """In-memory collector for OpenTelemetry traces and metrics.""" +@pytest.fixture(scope="session") +def _setup_test_telemetry(): + """Session-scoped: Set up test telemetry providers before client initialization.""" + # Reset OpenTelemetry's internal locks to allow test fixtures to override providers + if hasattr(otel_trace, "_TRACER_PROVIDER_SET_ONCE"): + otel_trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore + if hasattr(otel_metrics, "_METER_PROVIDER_SET_ONCE"): + otel_metrics._METER_PROVIDER_SET_ONCE._done = False # type: ignore - def __init__(self): - self.span_exporter = InMemorySpanExporter() - self.tracer_provider = TracerProvider() - self.tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter)) - trace.set_tracer_provider(self.tracer_provider) + # Create and set up providers before client initialization + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + trace.set_tracer_provider(tracer_provider) - self.metric_reader = InMemoryMetricReader() - self.meter_provider = MeterProvider(metric_readers=[self.metric_reader]) - metrics.set_meter_provider(self.meter_provider) + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Set module-level providers so TelemetryAdapter uses them + telemetry_module._TRACER_PROVIDER = tracer_provider + + yield tracer_provider, meter_provider, span_exporter, metric_reader + + # Cleanup + telemetry_module._TRACER_PROVIDER = None + tracer_provider.shutdown() + meter_provider.shutdown() + + +class TestCollector: + def __init__(self, span_exp, metric_read): + assert span_exp and metric_read + self.span_exporter = span_exp + self.metric_reader = metric_read def get_spans(self) -> tuple[ReadableSpan, ...]: return self.span_exporter.get_finished_spans() def get_metrics(self) -> Any | None: - return self.metric_reader.get_metrics_data() - - def shutdown(self) -> None: - self.tracer_provider.shutdown() - self.meter_provider.shutdown() + metrics = self.metric_reader.get_metrics_data() + if metrics and metrics.resource_metrics: + return metrics.resource_metrics[0].scope_metrics[0].metrics + return None @pytest.fixture -def mock_otlp_collector(): - """Function-scoped: Fresh telemetry data view for each test.""" - if hasattr(otel_trace, "_TRACER_PROVIDER_SET_ONCE"): - otel_trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore +def mock_otlp_collector(_setup_test_telemetry): + """Function-scoped: Access to telemetry data for each test.""" + # Unpack the providers from the session fixture + tracer_provider, meter_provider, span_exporter, metric_reader = _setup_test_telemetry - collector = OtelTestCollector() - telemetry_module._TRACER_PROVIDER = collector.tracer_provider + collector = TestCollector(span_exporter, metric_reader) + + # Clear spans between tests + span_exporter.clear() yield collector - - telemetry_module._TRACER_PROVIDER = None - collector.shutdown() diff --git a/tests/integration/telemetry/test_completions.py b/tests/integration/telemetry/test_completions.py index 755d385b9..b754f2c32 100644 --- a/tests/integration/telemetry/test_completions.py +++ b/tests/integration/telemetry/test_completions.py @@ -41,8 +41,6 @@ def test_streaming_chunk_count(mock_otlp_collector, llama_stack_client, text_mod def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client, text_model_id): """Comprehensive validation of telemetry data format including spans and metrics.""" - collector = mock_otlp_collector - response = llama_stack_client.chat.completions.create( model=text_model_id, messages=[{"role": "user", "content": "Test trace openai with temperature 0.7"}], @@ -51,31 +49,48 @@ def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client, stream=False, ) - assert response + assert response.usage.get("prompt_tokens") > 0 + assert response.usage.get("completion_tokens") > 0 + assert response.usage.get("total_tokens") > 0 # Verify spans - spans = collector.get_spans() + spans = mock_otlp_collector.get_spans() assert len(spans) == 5 for span in spans: - print(f"Span: {span.attributes}") - if span.attributes.get("__autotraced__"): - assert span.attributes.get("__class__") and span.attributes.get("__method__") - assert span.attributes.get("__type__") in ["async", "sync", "async_generator"] - if span.attributes.get("__args__"): - args = json.loads(span.attributes.get("__args__")) - # The parameter is 'model' in openai_chat_completion, not 'model_id' - if "model" in args: + attrs = span.attributes + assert attrs is not None + + # Root span is created manually by tracing middleware, not by @trace_protocol decorator + is_root_span = attrs.get("__root__") is True + + if is_root_span: + # Root spans have different attributes + assert attrs.get("__location__") in ["library_client", "server"] + else: + # Non-root spans are created by @trace_protocol decorator + assert attrs.get("__autotraced__") + assert attrs.get("__class__") and attrs.get("__method__") + assert attrs.get("__type__") in ["async", "sync", "async_generator"] + + args = json.loads(attrs["__args__"]) + if "model_id" in args: + assert args.get("model_id") == text_model_id + else: assert args.get("model") == text_model_id - # Verify token metrics in response - # Note: Llama Stack emits token metrics in the response JSON, not via OTel Metrics API - usage = response.usage if hasattr(response, "usage") else response.get("usage") - assert usage - prompt_tokens = usage.get("prompt_tokens") if isinstance(usage, dict) else usage.prompt_tokens - completion_tokens = usage.get("completion_tokens") if isinstance(usage, dict) else usage.completion_tokens - total_tokens = usage.get("total_tokens") if isinstance(usage, dict) else usage.total_tokens - - assert prompt_tokens is not None and prompt_tokens > 0 - assert completion_tokens is not None and completion_tokens > 0 - assert total_tokens is not None and total_tokens > 0 + # Verify token usage metrics in response + metrics = mock_otlp_collector.get_metrics() + print(f"metrics: {metrics}") + assert metrics + for metric in metrics: + assert metric.name in ["completion_tokens", "total_tokens", "prompt_tokens"] + assert metric.unit == "tokens" + assert metric.data.data_points and len(metric.data.data_points) == 1 + match metric.name: + case "completion_tokens": + assert metric.data.data_points[0].value == response.usage.get("completion_tokens") + case "total_tokens": + assert metric.data.data_points[0].value == response.usage.get("total_tokens") + case "prompt_tokens": + assert metric.data.data_points[0].value == response.usage.get("prompt_tokens")