From e2aefd797f502cc80cde88b67a8e85ecdfd82f86 Mon Sep 17 00:00:00 2001 From: Charlie Doern Date: Fri, 11 Jul 2025 20:51:23 -0400 Subject: [PATCH] fix: fix meta_reference telemetry console currently, none of the inference metrics are printed when using the console. This is because self._log_metric is only implemented if self.meter is not None. acquire the lock, and add event to the span as the other `_log_...` method do Signed-off-by: Charlie Doern --- .../meta_reference/console_span_processor.py | 5 +-- .../telemetry/meta_reference/telemetry.py | 39 ++++++++++++++++++- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/llama_stack/providers/inline/telemetry/meta_reference/console_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/console_span_processor.py index b4c77437d..78e49af94 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/console_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/console_span_processor.py @@ -28,9 +28,6 @@ class ConsoleSpanProcessor(SpanProcessor): logger.info(f"[dim]{timestamp}[/dim] [bold magenta][START][/bold magenta] [dim]{span.name}[/dim]") def on_end(self, span: ReadableSpan) -> None: - if span.attributes and span.attributes.get("__autotraced__"): - return - timestamp = datetime.fromtimestamp(span.end_time / 1e9, tz=UTC).strftime("%H:%M:%S.%f")[:-3] span_context = f"[dim]{timestamp}[/dim] [bold magenta][END][/bold magenta] [dim]{span.name}[/dim]" if span.status.status_code == StatusCode.ERROR: @@ -67,7 +64,7 @@ class ConsoleSpanProcessor(SpanProcessor): for key, value in event.attributes.items(): if key.startswith("__") or key in ["message", "severity"]: continue - logger.info(f"/r[dim]{key}[/dim]: {value}") + logger.info(f"[dim]{key}[/dim]: {value}") def shutdown(self) -> None: """Shutdown the processor.""" diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 623267172..d99255c79 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -4,10 +4,13 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import logging import threading from typing import Any from opentelemetry import metrics, trace + +logger = logging.getLogger(__name__) from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider @@ -110,7 +113,7 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): if TelemetrySink.SQLITE in self.config.sinks: trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path)) if TelemetrySink.CONSOLE in self.config.sinks: - trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor()) + trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True)) if TelemetrySink.OTEL_METRIC in self.config.sinks: self.meter = metrics.get_meter(__name__) @@ -126,9 +129,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): trace.get_tracer_provider().force_flush() async def log_event(self, event: Event, ttl_seconds: int = 604800) -> None: + logger.debug(f"DEBUG: log_event called with event type: {type(event).__name__}") if isinstance(event, UnstructuredLogEvent): self._log_unstructured(event, ttl_seconds) elif isinstance(event, MetricEvent): + logger.debug("DEBUG: Routing MetricEvent to _log_metric") self._log_metric(event) elif isinstance(event, StructuredLogEvent): self._log_structured(event, ttl_seconds) @@ -188,6 +193,38 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): return _GLOBAL_STORAGE["gauges"][name] def _log_metric(self, event: MetricEvent) -> None: + # Always log to console if console sink is enabled (debug) + if TelemetrySink.CONSOLE in self.config.sinks: + logger.debug(f"METRIC: {event.metric}={event.value} {event.unit} {event.attributes}") + + # Add metric as an event to the current span + try: + with self._lock: + # Only try to add to span if we have a valid span_id + if event.span_id: + try: + span_id = int(event.span_id, 16) + span = _GLOBAL_STORAGE["active_spans"].get(span_id) + + if span: + timestamp_ns = int(event.timestamp.timestamp() * 1e9) + span.add_event( + name=f"metric.{event.metric}", + attributes={ + "value": event.value, + "unit": event.unit, + **(event.attributes or {}), + }, + timestamp=timestamp_ns, + ) + except (ValueError, KeyError): + # Invalid span_id or span not found, but we already logged to console above + pass + except Exception: + # Lock acquisition failed + logger.debug("Failed to acquire lock to add metric to span") + + # Log to OpenTelemetry meter if available if self.meter is None: return if isinstance(event.value, int):