From 138d9b777e097ae7938ada92d7f3d53fc3e2e238 Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Tue, 4 Nov 2025 15:04:14 -0500 Subject: [PATCH] fix(tests): telemetry tests take delta for metrics to isolate data to each test --- .../integration/telemetry/collectors/base.py | 236 ++++++++++++++---- .../telemetry/collectors/in_memory.py | 1 + .../integration/telemetry/collectors/otlp.py | 7 +- 3 files changed, 190 insertions(+), 54 deletions(-) diff --git a/tests/integration/telemetry/collectors/base.py b/tests/integration/telemetry/collectors/base.py index a0fa803af..c6c96e99a 100644 --- a/tests/integration/telemetry/collectors/base.py +++ b/tests/integration/telemetry/collectors/base.py @@ -6,6 +6,7 @@ """Shared helpers for telemetry test collectors.""" +import os import time from collections.abc import Iterable from dataclasses import dataclass @@ -130,6 +131,53 @@ class BaseTelemetryCollector: across both library-client and server modes. """ + # Default delay in seconds if OTEL_METRIC_EXPORT_INTERVAL is not set + _DEFAULT_BASELINE_STABILIZATION_DELAY = 0.2 + + def __init__(self): + self._metric_baseline: dict[tuple[str, str], float] = {} + + @classmethod + def _get_baseline_stabilization_delay(cls) -> float: + """Get baseline stabilization delay from OTEL_METRIC_EXPORT_INTERVAL. + + Adds 1.5x buffer for CI environments. + """ + interval_ms = os.environ.get("OTEL_METRIC_EXPORT_INTERVAL") + if interval_ms: + try: + delay = float(interval_ms) / 1000.0 + except (ValueError, TypeError): + delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY + else: + delay = cls._DEFAULT_BASELINE_STABILIZATION_DELAY + + if os.environ.get("CI"): + delay *= 1.5 + + return delay + + def _get_metric_key(self, metric: MetricStub) -> tuple[str, str]: + """Generate a stable key for a metric based on name and attributes.""" + attrs = metric.attributes or {} + attr_key = ",".join(f"{k}={v}" for k, v in sorted(attrs.items())) + return (metric.name, attr_key) + + def _compute_metric_delta(self, metric: MetricStub) -> int | float | None: + """Compute delta value for a metric from baseline. + + Returns: + Delta value if metric was in baseline, absolute value if new, None if unchanged. + """ + metric_key = self._get_metric_key(metric) + + if metric_key in self._metric_baseline: + baseline_value = self._metric_baseline[metric_key] + delta = metric.value - baseline_value + return delta if delta > 0 else None + else: + return metric.value + def get_spans( self, expected_count: int | None = None, @@ -170,41 +218,92 @@ class BaseTelemetryCollector: poll_interval: float = 0.05, expect_model_id: str | None = None, ) -> dict[str, MetricStub]: - """Get metrics with polling until metrics are available or timeout is reached.""" + """Poll until expected metrics are available or timeout is reached. - # metrics need to be collected since get requests delete stored metrics + Returns metrics with delta values computed from baseline. + """ deadline = time.time() + timeout min_count = expected_count if expected_count is not None else 1 accumulated_metrics = {} - count_metrics_with_model_id = 0 + seen_metric_names_with_model_id = set() while time.time() < deadline: current_metrics = self._snapshot_metrics() if current_metrics: for metric in current_metrics: - metric_name = metric.name - if metric_name not in accumulated_metrics: - accumulated_metrics[metric_name] = metric - if ( - expect_model_id - and metric.attributes - and metric.attributes.get("model_id") == expect_model_id - ): - count_metrics_with_model_id += 1 - else: - accumulated_metrics[metric_name] = metric + delta_value = self._compute_metric_delta(metric) + if delta_value is None: + continue - # Check if we have enough metrics - if len(accumulated_metrics) >= min_count: - if not expect_model_id: - return accumulated_metrics - if count_metrics_with_model_id >= min_count: - return accumulated_metrics + metric_with_delta = MetricStub( + name=metric.name, + value=delta_value, + attributes=metric.attributes, + ) + + self._accumulate_metric( + accumulated_metrics, + metric_with_delta, + expect_model_id, + seen_metric_names_with_model_id, + ) + + if self._has_enough_metrics( + accumulated_metrics, seen_metric_names_with_model_id, min_count, expect_model_id + ): + return accumulated_metrics time.sleep(poll_interval) return accumulated_metrics + def _accumulate_metric( + self, + accumulated: dict[str, MetricStub], + metric: MetricStub, + expect_model_id: str | None, + seen_with_model_id: set[str], + ) -> None: + """Accumulate a metric, preferring those matching expected model_id.""" + metric_name = metric.name + matches_model_id = ( + expect_model_id and metric.attributes and metric.attributes.get("model_id") == expect_model_id + ) + + if metric_name not in accumulated: + accumulated[metric_name] = metric + if matches_model_id: + seen_with_model_id.add(metric_name) + return + + existing = accumulated[metric_name] + existing_matches = ( + expect_model_id and existing.attributes and existing.attributes.get("model_id") == expect_model_id + ) + + if matches_model_id and not existing_matches: + accumulated[metric_name] = metric + seen_with_model_id.add(metric_name) + elif matches_model_id == existing_matches: + if metric.value > existing.value: + accumulated[metric_name] = metric + if matches_model_id: + seen_with_model_id.add(metric_name) + + def _has_enough_metrics( + self, + accumulated: dict[str, MetricStub], + seen_with_model_id: set[str], + min_count: int, + expect_model_id: str | None, + ) -> bool: + """Check if we have collected enough metrics.""" + if len(accumulated) < min_count: + return False + if not expect_model_id: + return True + return len(seen_with_model_id) >= min_count + @staticmethod def _convert_attributes_to_dict(attrs: Any) -> dict[str, Any]: """Convert various attribute types to a consistent dictionary format. @@ -289,10 +388,8 @@ class BaseTelemetryCollector: if not (metric.data.data_points and len(metric.data.data_points) > 0): return None - # Get the value from the first data point data_point = metric.data.data_points[0] - # Handle different metric types if hasattr(data_point, "value"): # Counter or Gauge value = data_point.value @@ -302,7 +399,6 @@ class BaseTelemetryCollector: else: return None - # Extract attributes if available attributes = {} if hasattr(data_point, "attributes"): attrs = data_point.attributes @@ -318,47 +414,85 @@ class BaseTelemetryCollector: ) @staticmethod - def _create_metric_stub_from_protobuf(metric: Any) -> MetricStub | None: - """Create MetricStub from protobuf metric object. + def _create_metric_stubs_from_protobuf(metric: Any) -> list[MetricStub]: + """Create list of MetricStub objects from protobuf metric object. - Protobuf metrics have a different structure than OpenTelemetry metrics. - They can have sum, gauge, or histogram data. + Protobuf metrics can have sum, gauge, or histogram data. Each metric can have + multiple data points with different attributes, so we return one MetricStub + per data point. + + Returns: + List of MetricStub objects, one per data point in the metric. """ if not hasattr(metric, "name"): - return None + return [] + + metric_stubs = [] - # Try to extract value from different metric types for metric_type in ["sum", "gauge", "histogram"]: - if hasattr(metric, metric_type): - metric_data = getattr(metric, metric_type) - if metric_data and hasattr(metric_data, "data_points"): - data_points = metric_data.data_points - if data_points and len(data_points) > 0: - data_point = data_points[0] + if not hasattr(metric, metric_type): + continue - # Extract attributes first (needed for all metric types) - attributes = ( - attributes_to_dict(data_point.attributes) if hasattr(data_point, "attributes") else {} - ) + metric_data = getattr(metric, metric_type) + if not metric_data or not hasattr(metric_data, "data_points"): + continue - # Extract value based on metric type - if metric_type == "sum": - value = data_point.as_int - elif metric_type == "gauge": - value = data_point.as_double - else: # histogram - value = data_point.sum + data_points = metric_data.data_points + if not data_points: + continue + + for data_point in data_points: + attributes = attributes_to_dict(data_point.attributes) if hasattr(data_point, "attributes") else {} + + value = BaseTelemetryCollector._extract_data_point_value(data_point, metric_type) + if value is None: + continue + + metric_stubs.append( + MetricStub( + name=metric.name, + value=value, + attributes=attributes, + ) + ) + + # Only process one metric type per metric + break + + return metric_stubs + + @staticmethod + def _extract_data_point_value(data_point: Any, metric_type: str) -> float | int | None: + """Extract value from a protobuf metric data point based on metric type.""" + if metric_type == "sum": + if hasattr(data_point, "as_int"): + return data_point.as_int + if hasattr(data_point, "as_double"): + return data_point.as_double + elif metric_type == "gauge": + if hasattr(data_point, "as_double"): + return data_point.as_double + elif metric_type == "histogram": + # Histograms use sum field which represents cumulative sum of all recorded values + if hasattr(data_point, "sum"): + return data_point.sum - return MetricStub( - name=metric.name, - value=value, - attributes=attributes, - ) return None def clear(self) -> None: + """Clear telemetry data and establish baseline for metric delta computation.""" + self._metric_baseline.clear() + self._clear_impl() + delay = self._get_baseline_stabilization_delay() + time.sleep(delay) + baseline_metrics = self._snapshot_metrics() + if baseline_metrics: + for metric in baseline_metrics: + metric_key = self._get_metric_key(metric) + self._metric_baseline[metric_key] = metric.value + def _snapshot_spans(self) -> tuple[SpanStub, ...]: # pragma: no cover - interface hook raise NotImplementedError diff --git a/tests/integration/telemetry/collectors/in_memory.py b/tests/integration/telemetry/collectors/in_memory.py index f431ed94d..7127b3816 100644 --- a/tests/integration/telemetry/collectors/in_memory.py +++ b/tests/integration/telemetry/collectors/in_memory.py @@ -28,6 +28,7 @@ class InMemoryTelemetryCollector(BaseTelemetryCollector): """ def __init__(self, span_exporter: InMemorySpanExporter, metric_reader: InMemoryMetricReader) -> None: + super().__init__() self._span_exporter = span_exporter self._metric_reader = metric_reader diff --git a/tests/integration/telemetry/collectors/otlp.py b/tests/integration/telemetry/collectors/otlp.py index 750ca6a55..21702e447 100644 --- a/tests/integration/telemetry/collectors/otlp.py +++ b/tests/integration/telemetry/collectors/otlp.py @@ -21,6 +21,7 @@ from .base import BaseTelemetryCollector, MetricStub, SpanStub, attributes_to_di class OtlpHttpTestCollector(BaseTelemetryCollector): def __init__(self) -> None: + super().__init__() self._spans: list[SpanStub] = [] self._metrics: list[MetricStub] = [] self._lock = threading.Lock() @@ -60,9 +61,9 @@ class OtlpHttpTestCollector(BaseTelemetryCollector): for resource_metrics in request.resource_metrics: for scope_metrics in resource_metrics.scope_metrics: for metric in scope_metrics.metrics: - metric_stub = self._create_metric_stub_from_protobuf(metric) - if metric_stub: - new_metrics.append(metric_stub) + # Handle multiple data points per metric (e.g., different attribute sets) + metric_stubs = self._create_metric_stubs_from_protobuf(metric) + new_metrics.extend(metric_stubs) if not new_metrics: return