fix(tests): deduplicate code and simplify user experience for telemetry test code

This commit is contained in:
Emilio Garcia 2025-10-29 12:53:38 -04:00
parent e4e8a59325
commit 52a7784847
4 changed files with 180 additions and 79 deletions

View file

@ -11,6 +11,31 @@ from dataclasses import dataclass
from typing import Any
@dataclass
class MetricStub:
"""Unified metric interface for both in-memory and OTLP collectors."""
name: str
value: Any
attributes: dict[str, Any] | None = None
def get_value(self) -> Any:
"""Get the metric value."""
return self.value
def get_name(self) -> str:
"""Get the metric name."""
return self.name
def get_attributes(self) -> dict[str, Any]:
"""Get metric attributes as a dictionary."""
return self.attributes or {}
def get_attribute(self, key: str) -> Any:
"""Get a specific attribute value by key."""
return self.get_attributes().get(key)
@dataclass
class SpanStub:
"""Unified span interface for both in-memory and OTLP collectors."""
@ -35,18 +60,7 @@ class SpanStub:
Handles different attribute types (mapping, dict, etc.) and returns
a consistent dictionary format.
"""
attrs = self.attributes
if attrs is None:
return {}
# Handle mapping-like objects (e.g., mappingproxy)
try:
return dict(attrs.items()) # type: ignore[attr-defined]
except AttributeError:
try:
return dict(attrs)
except TypeError:
return dict(attrs) if attrs else {}
return BaseTelemetryCollector._convert_attributes_to_dict(self.attributes)
def get_attribute(self, key: str) -> Any:
"""Get a specific attribute value by key."""
@ -167,16 +181,142 @@ class BaseTelemetryCollector:
last_len = len(spans)
time.sleep(poll_interval)
def get_metrics(self) -> Any | None:
def get_metrics(self) -> tuple[MetricStub, ...] | None:
return self._snapshot_metrics()
def get_metrics_dict(self) -> dict[str, Any]:
"""Get metrics as a simple name->value dictionary for easy lookup.
This method works with MetricStub objects for consistent interface
across both in-memory and OTLP collectors.
"""
metrics = self._snapshot_metrics()
if not metrics:
return {}
return {metric.get_name(): metric.get_value() for metric in metrics}
def get_metric_value(self, name: str) -> Any | None:
"""Get a specific metric value by name."""
return self.get_metrics_dict().get(name)
def has_metric(self, name: str) -> bool:
"""Check if a metric with the given name exists."""
return name in self.get_metrics_dict()
def get_metric_names(self) -> list[str]:
"""Get all available metric names."""
return list(self.get_metrics_dict().keys())
@staticmethod
def _convert_attributes_to_dict(attrs: Any) -> dict[str, Any]:
"""Convert various attribute types to a consistent dictionary format.
Handles mappingproxy, dict, and other attribute types.
"""
if attrs is None:
return {}
try:
return dict(attrs.items()) # type: ignore[attr-defined]
except AttributeError:
try:
return dict(attrs)
except TypeError:
return dict(attrs) if attrs else {}
@staticmethod
def _extract_trace_span_ids(span: Any) -> tuple[str | None, str | None]:
"""Extract trace_id and span_id from OpenTelemetry span object.
Handles both context-based and direct attribute access.
"""
trace_id = None
span_id = None
context = getattr(span, "context", None)
if context:
trace_id = f"{context.trace_id:032x}"
span_id = f"{context.span_id:016x}"
else:
trace_id = getattr(span, "trace_id", None)
span_id = getattr(span, "span_id", None)
return trace_id, span_id
@staticmethod
def _create_span_stub_from_opentelemetry(span: Any) -> SpanStub:
"""Create SpanStub from OpenTelemetry span object.
This helper reduces code duplication between collectors.
"""
trace_id, span_id = BaseTelemetryCollector._extract_trace_span_ids(span)
attributes = BaseTelemetryCollector._convert_attributes_to_dict(span.attributes)
return SpanStub(
name=span.name,
attributes=attributes,
trace_id=trace_id,
span_id=span_id,
)
@staticmethod
def _create_span_stub_from_protobuf(span: Any, resource_attrs: dict[str, Any] | None = None) -> SpanStub:
"""Create SpanStub from protobuf span object.
This helper handles the different structure of protobuf spans.
"""
attributes = attributes_to_dict(span.attributes)
events = events_to_list(span.events) if span.events else None
trace_id = span.trace_id.hex() if span.trace_id else None
span_id = span.span_id.hex() if span.span_id else None
return SpanStub(
name=span.name,
attributes=attributes,
resource_attributes=resource_attrs,
events=events,
trace_id=trace_id,
span_id=span_id,
)
@staticmethod
def _extract_metric_from_opentelemetry(metric: Any) -> MetricStub | None:
"""Extract MetricStub from OpenTelemetry metric object.
This helper reduces code duplication between collectors.
"""
if not (hasattr(metric, "name") and hasattr(metric, "data") and hasattr(metric.data, "data_points")):
return None
if not (metric.data.data_points and len(metric.data.data_points) > 0):
return None
# Get the value from the first data point
value = metric.data.data_points[0].value
# Extract attributes if available
attributes = {}
if hasattr(metric.data.data_points[0], "attributes"):
attrs = metric.data.data_points[0].attributes
if attrs is not None and hasattr(attrs, "items"):
attributes = dict(attrs.items())
elif attrs is not None and not isinstance(attrs, dict):
attributes = dict(attrs)
return MetricStub(
name=metric.name,
value=value,
attributes=attributes if attributes else None,
)
def clear(self) -> None:
self._clear_impl()
def _snapshot_spans(self) -> tuple[SpanStub, ...]: # pragma: no cover - interface hook
raise NotImplementedError
def _snapshot_metrics(self) -> Any | None: # pragma: no cover - interface hook
def _snapshot_metrics(self) -> tuple[MetricStub, ...] | None: # pragma: no cover - interface hook
raise NotImplementedError
def _clear_impl(self) -> None: # pragma: no cover - interface hook