fix(tests): telemetry tests take delta for metrics to isolate data to each test

This commit is contained in:
Emilio Garcia 2025-11-04 15:04:14 -05:00
parent e8d20b9c50
commit 138d9b777e
3 changed files with 190 additions and 54 deletions

View file

@ -6,6 +6,7 @@
"""Shared helpers for telemetry test collectors."""
import os
import time
from collections.abc import Iterable
from dataclasses import dataclass
@ -130,6 +131,53 @@ class BaseTelemetryCollector:
across both library-client and server modes.
"""
# Default delay in seconds if OTEL_METRIC_EXPORT_INTERVAL is not set
_DEFAULT_BASELINE_STABILIZATION_DELAY = 0.2
def __init__(self):
self._metric_baseline: dict[tuple[str, str], float] = {}
@classmethod
def _get_baseline_stabilization_delay(cls) -> float:
"""Get baseline stabilization delay from OTEL_METRIC_EXPORT_INTERVAL.
Adds 1.5x buffer for CI environments.
"""
interval_ms = os.environ.get("OTEL_METRIC_EXPORT_INTERVAL")
if interval_ms:
try:
delay = float(interval_ms) / 1000.0
except (ValueError, TypeError):
delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY
else:
delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY
if os.environ.get("CI"):
delay *= 1.5
return delay
def _get_metric_key(self, metric: MetricStub) -> tuple[str, str]:
"""Generate a stable key for a metric based on name and attributes."""
attrs = metric.attributes or {}
attr_key = ",".join(f"{k}={v}" for k, v in sorted(attrs.items()))
return (metric.name, attr_key)
def _compute_metric_delta(self, metric: MetricStub) -> int | float | None:
"""Compute delta value for a metric from baseline.
Returns:
Delta value if metric was in baseline, absolute value if new, None if unchanged.
"""
metric_key = self._get_metric_key(metric)
if metric_key in self._metric_baseline:
baseline_value = self._metric_baseline[metric_key]
delta = metric.value - baseline_value
return delta if delta > 0 else None
else:
return metric.value
def get_spans(
self,
expected_count: int | None = None,
@ -170,41 +218,92 @@ class BaseTelemetryCollector:
poll_interval: float = 0.05,
expect_model_id: str | None = None,
) -> dict[str, MetricStub]:
"""Get metrics with polling until metrics are available or timeout is reached."""
"""Poll until expected metrics are available or timeout is reached.
# metrics need to be collected since get requests delete stored metrics
Returns metrics with delta values computed from baseline.
"""
deadline = time.time() + timeout
min_count = expected_count if expected_count is not None else 1
accumulated_metrics = {}
count_metrics_with_model_id = 0
seen_metric_names_with_model_id = set()
while time.time() < deadline:
current_metrics = self._snapshot_metrics()
if current_metrics:
for metric in current_metrics:
metric_name = metric.name
if metric_name not in accumulated_metrics:
accumulated_metrics[metric_name] = metric
if (
expect_model_id
and metric.attributes
and metric.attributes.get("model_id") == expect_model_id
):
count_metrics_with_model_id += 1
else:
accumulated_metrics[metric_name] = metric
delta_value = self._compute_metric_delta(metric)
if delta_value is None:
continue
# Check if we have enough metrics
if len(accumulated_metrics) >= min_count:
if not expect_model_id:
return accumulated_metrics
if count_metrics_with_model_id >= min_count:
return accumulated_metrics
metric_with_delta = MetricStub(
name=metric.name,
value=delta_value,
attributes=metric.attributes,
)
self._accumulate_metric(
accumulated_metrics,
metric_with_delta,
expect_model_id,
seen_metric_names_with_model_id,
)
if self._has_enough_metrics(
accumulated_metrics, seen_metric_names_with_model_id, min_count, expect_model_id
):
return accumulated_metrics
time.sleep(poll_interval)
return accumulated_metrics
def _accumulate_metric(
self,
accumulated: dict[str, MetricStub],
metric: MetricStub,
expect_model_id: str | None,
seen_with_model_id: set[str],
) -> None:
"""Accumulate a metric, preferring those matching expected model_id."""
metric_name = metric.name
matches_model_id = (
expect_model_id and metric.attributes and metric.attributes.get("model_id") == expect_model_id
)
if metric_name not in accumulated:
accumulated[metric_name] = metric
if matches_model_id:
seen_with_model_id.add(metric_name)
return
existing = accumulated[metric_name]
existing_matches = (
expect_model_id and existing.attributes and existing.attributes.get("model_id") == expect_model_id
)
if matches_model_id and not existing_matches:
accumulated[metric_name] = metric
seen_with_model_id.add(metric_name)
elif matches_model_id == existing_matches:
if metric.value > existing.value:
accumulated[metric_name] = metric
if matches_model_id:
seen_with_model_id.add(metric_name)
def _has_enough_metrics(
self,
accumulated: dict[str, MetricStub],
seen_with_model_id: set[str],
min_count: int,
expect_model_id: str | None,
) -> bool:
"""Check if we have collected enough metrics."""
if len(accumulated) < min_count:
return False
if not expect_model_id:
return True
return len(seen_with_model_id) >= min_count
@staticmethod
def _convert_attributes_to_dict(attrs: Any) -> dict[str, Any]:
"""Convert various attribute types to a consistent dictionary format.
@ -289,10 +388,8 @@ class BaseTelemetryCollector:
if not (metric.data.data_points and len(metric.data.data_points) > 0):
return None
# Get the value from the first data point
data_point = metric.data.data_points[0]
# Handle different metric types
if hasattr(data_point, "value"):
# Counter or Gauge
value = data_point.value
@ -302,7 +399,6 @@ class BaseTelemetryCollector:
else:
return None
# Extract attributes if available
attributes = {}
if hasattr(data_point, "attributes"):
attrs = data_point.attributes
@ -318,47 +414,85 @@ class BaseTelemetryCollector:
)
@staticmethod
def _create_metric_stub_from_protobuf(metric: Any) -> MetricStub | None:
"""Create MetricStub from protobuf metric object.
def _create_metric_stubs_from_protobuf(metric: Any) -> list[MetricStub]:
"""Create list of MetricStub objects from protobuf metric object.
Protobuf metrics have a different structure than OpenTelemetry metrics.
They can have sum, gauge, or histogram data.
Protobuf metrics can have sum, gauge, or histogram data. Each metric can have
multiple data points with different attributes, so we return one MetricStub
per data point.
Returns:
List of MetricStub objects, one per data point in the metric.
"""
if not hasattr(metric, "name"):
return None
return []
metric_stubs = []
# Try to extract value from different metric types
for metric_type in ["sum", "gauge", "histogram"]:
if hasattr(metric, metric_type):
metric_data = getattr(metric, metric_type)
if metric_data and hasattr(metric_data, "data_points"):
data_points = metric_data.data_points
if data_points and len(data_points) > 0:
data_point = data_points[0]
if not hasattr(metric, metric_type):
continue
# Extract attributes first (needed for all metric types)
attributes = (
attributes_to_dict(data_point.attributes) if hasattr(data_point, "attributes") else {}
)
metric_data = getattr(metric, metric_type)
if not metric_data or not hasattr(metric_data, "data_points"):
continue
# Extract value based on metric type
if metric_type == "sum":
value = data_point.as_int
elif metric_type == "gauge":
value = data_point.as_double
else: # histogram
value = data_point.sum
data_points = metric_data.data_points
if not data_points:
continue
for data_point in data_points:
attributes = attributes_to_dict(data_point.attributes) if hasattr(data_point, "attributes") else {}
value = BaseTelemetryCollector._extract_data_point_value(data_point, metric_type)
if value is None:
continue
metric_stubs.append(
MetricStub(
name=metric.name,
value=value,
attributes=attributes,
)
)
# Only process one metric type per metric
break
return metric_stubs
@staticmethod
def _extract_data_point_value(data_point: Any, metric_type: str) -> float | int | None:
"""Extract value from a protobuf metric data point based on metric type."""
if metric_type == "sum":
if hasattr(data_point, "as_int"):
return data_point.as_int
if hasattr(data_point, "as_double"):
return data_point.as_double
elif metric_type == "gauge":
if hasattr(data_point, "as_double"):
return data_point.as_double
elif metric_type == "histogram":
# Histograms use sum field which represents cumulative sum of all recorded values
if hasattr(data_point, "sum"):
return data_point.sum
return MetricStub(
name=metric.name,
value=value,
attributes=attributes,
)
return None
def clear(self) -> None:
"""Clear telemetry data and establish baseline for metric delta computation."""
self._metric_baseline.clear()
self._clear_impl()
delay = self._get_baseline_stabilization_delay()
time.sleep(delay)
baseline_metrics = self._snapshot_metrics()
if baseline_metrics:
for metric in baseline_metrics:
metric_key = self._get_metric_key(metric)
self._metric_baseline[metric_key] = metric.value
def _snapshot_spans(self) -> tuple[SpanStub, ...]: # pragma: no cover - interface hook
raise NotImplementedError

View file

@ -28,6 +28,7 @@ class InMemoryTelemetryCollector(BaseTelemetryCollector):
"""
def __init__(self, span_exporter: InMemorySpanExporter, metric_reader: InMemoryMetricReader) -> None:
super().__init__()
self._span_exporter = span_exporter
self._metric_reader = metric_reader

View file

@ -21,6 +21,7 @@ from .base import BaseTelemetryCollector, MetricStub, SpanStub, attributes_to_di
class OtlpHttpTestCollector(BaseTelemetryCollector):
def __init__(self) -> None:
super().__init__()
self._spans: list[SpanStub] = []
self._metrics: list[MetricStub] = []
self._lock = threading.Lock()
@ -60,9 +61,9 @@ class OtlpHttpTestCollector(BaseTelemetryCollector):
for resource_metrics in request.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
metric_stub = self._create_metric_stub_from_protobuf(metric)
if metric_stub:
new_metrics.append(metric_stub)
# Handle multiple data points per metric (e.g., different attribute sets)
metric_stubs = self._create_metric_stubs_from_protobuf(metric)
new_metrics.extend(metric_stubs)
if not new_metrics:
return