mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-15 06:00:48 +00:00
fix: fix meta_reference telemetry console
currently, none of the inference metrics are printed when using the console. This is because self._log_metric is only implemented if self.meter is not None. acquire the lock, and add event to the span as the other `_log_...` method do Signed-off-by: Charlie Doern <cdoern@redhat.com>
This commit is contained in:
parent
d52722b0d1
commit
e2aefd797f
2 changed files with 39 additions and 5 deletions
|
@ -28,9 +28,6 @@ class ConsoleSpanProcessor(SpanProcessor):
|
||||||
logger.info(f"[dim]{timestamp}[/dim] [bold magenta][START][/bold magenta] [dim]{span.name}[/dim]")
|
logger.info(f"[dim]{timestamp}[/dim] [bold magenta][START][/bold magenta] [dim]{span.name}[/dim]")
|
||||||
|
|
||||||
def on_end(self, span: ReadableSpan) -> None:
|
def on_end(self, span: ReadableSpan) -> None:
|
||||||
if span.attributes and span.attributes.get("__autotraced__"):
|
|
||||||
return
|
|
||||||
|
|
||||||
timestamp = datetime.fromtimestamp(span.end_time / 1e9, tz=UTC).strftime("%H:%M:%S.%f")[:-3]
|
timestamp = datetime.fromtimestamp(span.end_time / 1e9, tz=UTC).strftime("%H:%M:%S.%f")[:-3]
|
||||||
span_context = f"[dim]{timestamp}[/dim] [bold magenta][END][/bold magenta] [dim]{span.name}[/dim]"
|
span_context = f"[dim]{timestamp}[/dim] [bold magenta][END][/bold magenta] [dim]{span.name}[/dim]"
|
||||||
if span.status.status_code == StatusCode.ERROR:
|
if span.status.status_code == StatusCode.ERROR:
|
||||||
|
@ -67,7 +64,7 @@ class ConsoleSpanProcessor(SpanProcessor):
|
||||||
for key, value in event.attributes.items():
|
for key, value in event.attributes.items():
|
||||||
if key.startswith("__") or key in ["message", "severity"]:
|
if key.startswith("__") or key in ["message", "severity"]:
|
||||||
continue
|
continue
|
||||||
logger.info(f"/r[dim]{key}[/dim]: {value}")
|
logger.info(f"[dim]{key}[/dim]: {value}")
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
"""Shutdown the processor."""
|
"""Shutdown the processor."""
|
||||||
|
|
|
@ -4,10 +4,13 @@
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from opentelemetry import metrics, trace
|
from opentelemetry import metrics, trace
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
|
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
|
||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
from opentelemetry.sdk.metrics import MeterProvider
|
from opentelemetry.sdk.metrics import MeterProvider
|
||||||
|
@ -110,7 +113,7 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
if TelemetrySink.SQLITE in self.config.sinks:
|
if TelemetrySink.SQLITE in self.config.sinks:
|
||||||
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
|
trace.get_tracer_provider().add_span_processor(SQLiteSpanProcessor(self.config.sqlite_db_path))
|
||||||
if TelemetrySink.CONSOLE in self.config.sinks:
|
if TelemetrySink.CONSOLE in self.config.sinks:
|
||||||
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor())
|
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor(print_attributes=True))
|
||||||
|
|
||||||
if TelemetrySink.OTEL_METRIC in self.config.sinks:
|
if TelemetrySink.OTEL_METRIC in self.config.sinks:
|
||||||
self.meter = metrics.get_meter(__name__)
|
self.meter = metrics.get_meter(__name__)
|
||||||
|
@ -126,9 +129,11 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
trace.get_tracer_provider().force_flush()
|
trace.get_tracer_provider().force_flush()
|
||||||
|
|
||||||
async def log_event(self, event: Event, ttl_seconds: int = 604800) -> None:
|
async def log_event(self, event: Event, ttl_seconds: int = 604800) -> None:
|
||||||
|
logger.debug(f"DEBUG: log_event called with event type: {type(event).__name__}")
|
||||||
if isinstance(event, UnstructuredLogEvent):
|
if isinstance(event, UnstructuredLogEvent):
|
||||||
self._log_unstructured(event, ttl_seconds)
|
self._log_unstructured(event, ttl_seconds)
|
||||||
elif isinstance(event, MetricEvent):
|
elif isinstance(event, MetricEvent):
|
||||||
|
logger.debug("DEBUG: Routing MetricEvent to _log_metric")
|
||||||
self._log_metric(event)
|
self._log_metric(event)
|
||||||
elif isinstance(event, StructuredLogEvent):
|
elif isinstance(event, StructuredLogEvent):
|
||||||
self._log_structured(event, ttl_seconds)
|
self._log_structured(event, ttl_seconds)
|
||||||
|
@ -188,6 +193,38 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
return _GLOBAL_STORAGE["gauges"][name]
|
return _GLOBAL_STORAGE["gauges"][name]
|
||||||
|
|
||||||
def _log_metric(self, event: MetricEvent) -> None:
|
def _log_metric(self, event: MetricEvent) -> None:
|
||||||
|
# Always log to console if console sink is enabled (debug)
|
||||||
|
if TelemetrySink.CONSOLE in self.config.sinks:
|
||||||
|
logger.debug(f"METRIC: {event.metric}={event.value} {event.unit} {event.attributes}")
|
||||||
|
|
||||||
|
# Add metric as an event to the current span
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
# Only try to add to span if we have a valid span_id
|
||||||
|
if event.span_id:
|
||||||
|
try:
|
||||||
|
span_id = int(event.span_id, 16)
|
||||||
|
span = _GLOBAL_STORAGE["active_spans"].get(span_id)
|
||||||
|
|
||||||
|
if span:
|
||||||
|
timestamp_ns = int(event.timestamp.timestamp() * 1e9)
|
||||||
|
span.add_event(
|
||||||
|
name=f"metric.{event.metric}",
|
||||||
|
attributes={
|
||||||
|
"value": event.value,
|
||||||
|
"unit": event.unit,
|
||||||
|
**(event.attributes or {}),
|
||||||
|
},
|
||||||
|
timestamp=timestamp_ns,
|
||||||
|
)
|
||||||
|
except (ValueError, KeyError):
|
||||||
|
# Invalid span_id or span not found, but we already logged to console above
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
# Lock acquisition failed
|
||||||
|
logger.debug("Failed to acquire lock to add metric to span")
|
||||||
|
|
||||||
|
# Log to OpenTelemetry meter if available
|
||||||
if self.meter is None:
|
if self.meter is None:
|
||||||
return
|
return
|
||||||
if isinstance(event.value, int):
|
if isinstance(event.value, int):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue