From 0e0bc8aba7280464a475735caf302dd3d43f28c6 Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Wed, 29 Oct 2025 13:54:41 -0400 Subject: [PATCH] Fix protobuf metric parsing in OTLP collector - Add _create_metric_stub_from_protobuf method to correctly parse protobuf metrics - Add _extract_attributes_from_data_point helper method - Change metric handling to use protobuf-specific parsing instead of OpenTelemetry native parsing - Add missing typing import - Add OTEL_METRIC_EXPORT_INTERVAL environment variable for test configuration This fixes the CI failure where metrics were not being properly extracted from protobuf data in server mode tests. --- scripts/integration-tests.sh | 3 + ...34a95f56931b792d5939f4cebc57-826d44c3.json | 89 +++++++++++++++++++ .../integration/telemetry/collectors/base.py | 49 +++++----- .../integration/telemetry/collectors/otlp.py | 51 ++++++++++- .../integration/telemetry/test_completions.py | 45 ++++++++-- 5 files changed, 208 insertions(+), 29 deletions(-) create mode 100644 tests/integration/common/recordings/models-64a2277c90f0f42576f60c1030e3a020403d34a95f56931b792d5939f4cebc57-826d44c3.json diff --git a/scripts/integration-tests.sh b/scripts/integration-tests.sh index cdd3e736f..6d2b272b4 100755 --- a/scripts/integration-tests.sh +++ b/scripts/integration-tests.sh @@ -337,6 +337,9 @@ if [[ "$STACK_CONFIG" == *"docker:"* && "$COLLECT_ONLY" == false ]]; then DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_INFERENCE_MODE=$INFERENCE_MODE" DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_STACK_CONFIG_TYPE=server" DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:${COLLECTOR_PORT}" + DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_METRIC_EXPORT_INTERVAL=200" + DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_BSP_SCHEDULE_DELAY=200" + DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_BSP_EXPORT_TIMEOUT=2000" # Pass through API keys if they exist [ -n "${TOGETHER_API_KEY:-}" ] && DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e TOGETHER_API_KEY=$TOGETHER_API_KEY" diff --git a/tests/integration/common/recordings/models-64a2277c90f0f42576f60c1030e3a020403d34a95f56931b792d5939f4cebc57-826d44c3.json b/tests/integration/common/recordings/models-64a2277c90f0f42576f60c1030e3a020403d34a95f56931b792d5939f4cebc57-826d44c3.json new file mode 100644 index 000000000..878fcc650 --- /dev/null +++ b/tests/integration/common/recordings/models-64a2277c90f0f42576f60c1030e3a020403d34a95f56931b792d5939f4cebc57-826d44c3.json @@ -0,0 +1,89 @@ +{ + "test_id": null, + "request": { + "method": "POST", + "url": "http://0.0.0.0:11434/v1/v1/models", + "headers": {}, + "body": {}, + "endpoint": "/v1/models", + "model": "" + }, + "response": { + "body": [ + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "llama3.2:3b-instruct-fp16", + "created": 1760453641, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "qwen3:4b", + "created": 1757615302, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "gpt-oss:latest", + "created": 1756395223, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "nomic-embed-text:latest", + "created": 1756318548, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "llama3.2:3b", + "created": 1755191039, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "all-minilm:l6-v2", + "created": 1753968177, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "llama3.2:1b", + "created": 1746124735, + "object": "model", + "owned_by": "library" + } + }, + { + "__type__": "openai.types.model.Model", + "__data__": { + "id": "llama3.2:latest", + "created": 1746044170, + "object": "model", + "owned_by": "library" + } + } + ], + "is_streaming": false + }, + "id_normalization_mapping": {} +} diff --git a/tests/integration/telemetry/collectors/base.py b/tests/integration/telemetry/collectors/base.py index 45c1ef439..f88ab37bf 100644 --- a/tests/integration/telemetry/collectors/base.py +++ b/tests/integration/telemetry/collectors/base.py @@ -181,32 +181,39 @@ class BaseTelemetryCollector: last_len = len(spans) time.sleep(poll_interval) - def get_metrics(self) -> tuple[MetricStub, ...] | None: - return self._snapshot_metrics() + def get_metrics( + self, + expected_count: int | None = None, + timeout: float = 5.0, + poll_interval: float = 0.05, + ) -> dict[str, MetricStub]: + """Get metrics with polling until metrics are available or timeout is reached.""" + import time - def get_metrics_dict(self) -> dict[str, Any]: - """Get metrics as a simple name->value dictionary for easy lookup. + deadline = time.time() + timeout + min_count = expected_count if expected_count is not None else 1 + accumulated_metrics = {} - This method works with MetricStub objects for consistent interface - across both in-memory and OTLP collectors. - """ - metrics = self._snapshot_metrics() - if not metrics: - return {} + while time.time() < deadline: + current_metrics = self._snapshot_metrics() + if current_metrics: + # Accumulate new metrics without losing existing ones + for metric in current_metrics: + metric_name = metric.get_name() + if metric_name not in accumulated_metrics: + accumulated_metrics[metric_name] = metric + else: + # If we already have this metric, keep the latest one + # (in case metrics are updated with new values) + accumulated_metrics[metric_name] = metric - return {metric.get_name(): metric.get_value() for metric in metrics} + # Check if we have enough metrics + if len(accumulated_metrics) >= min_count: + return accumulated_metrics - def get_metric_value(self, name: str) -> Any | None: - """Get a specific metric value by name.""" - return self.get_metrics_dict().get(name) + time.sleep(poll_interval) - def has_metric(self, name: str) -> bool: - """Check if a metric with the given name exists.""" - return name in self.get_metrics_dict() - - def get_metric_names(self) -> list[str]: - """Get all available metric names.""" - return list(self.get_metrics_dict().keys()) + return accumulated_metrics @staticmethod def _convert_attributes_to_dict(attrs: Any) -> dict[str, Any]: diff --git a/tests/integration/telemetry/collectors/otlp.py b/tests/integration/telemetry/collectors/otlp.py index d8c1f2318..a3535f818 100644 --- a/tests/integration/telemetry/collectors/otlp.py +++ b/tests/integration/telemetry/collectors/otlp.py @@ -11,6 +11,7 @@ import os import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn +from typing import Any from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest @@ -59,7 +60,7 @@ class OtlpHttpTestCollector(BaseTelemetryCollector): for resource_metrics in request.resource_metrics: for scope_metrics in resource_metrics.scope_metrics: for metric in scope_metrics.metrics: - metric_stub = self._extract_metric_from_opentelemetry(metric) + metric_stub = self._create_metric_stub_from_protobuf(metric) if metric_stub: new_metrics.append(metric_stub) @@ -82,6 +83,54 @@ class OtlpHttpTestCollector(BaseTelemetryCollector): self._spans.clear() self._metrics.clear() + def _create_metric_stub_from_protobuf(self, metric: Any) -> MetricStub | None: + """Create MetricStub from protobuf metric object. + + Protobuf metrics have a different structure than OpenTelemetry metrics. + They can have sum, gauge, or histogram data. + """ + if not hasattr(metric, "name"): + return None + + # Try to extract value from different metric types + for metric_type in ["sum", "gauge", "histogram"]: + if hasattr(metric, metric_type): + metric_data = getattr(metric, metric_type) + if metric_data and hasattr(metric_data, "data_points"): + data_points = metric_data.data_points + if data_points and len(data_points) > 0: + data_point = data_points[0] + + # Extract value based on metric type + if metric_type == "sum": + value = data_point.as_int + elif metric_type == "gauge": + value = data_point.as_double + else: # histogram + value = data_point.count + + # Extract attributes if available + attributes = self._extract_attributes_from_data_point(data_point) + + return MetricStub( + name=metric.name, + value=value, + attributes=attributes if attributes else None, + ) + + return None + + def _extract_attributes_from_data_point(self, data_point: Any) -> dict[str, Any]: + """Extract attributes from a protobuf data point.""" + if not hasattr(data_point, "attributes"): + return {} + + attrs = data_point.attributes + if not attrs: + return {} + + return {kv.key: kv.value.string_value or kv.value.int_value or kv.value.double_value for kv in attrs} + def shutdown(self) -> None: self._server.shutdown() self._server.server_close() diff --git a/tests/integration/telemetry/test_completions.py b/tests/integration/telemetry/test_completions.py index b61ef7821..d72f9e660 100644 --- a/tests/integration/telemetry/test_completions.py +++ b/tests/integration/telemetry/test_completions.py @@ -49,6 +49,7 @@ def test_streaming_chunk_count(mock_otlp_collector, llama_stack_client, text_mod def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client, text_model_id): """Comprehensive validation of telemetry data format including spans and metrics.""" + response = llama_stack_client.chat.completions.create( model=text_model_id, messages=[{"role": "user", "content": "Test trace openai with temperature 0.7"}], @@ -106,15 +107,45 @@ def test_telemetry_format_completeness(mock_otlp_collector, llama_stack_client, # At least one span should capture the fully qualified model ID assert text_model_id in logged_model_ids, f"Expected to find {text_model_id} in spans, but got {logged_model_ids}" - # Verify token usage metrics in response - # Verify expected metrics are present + # Verify token usage metrics in response using polling expected_metrics = ["completion_tokens", "total_tokens", "prompt_tokens"] + metrics = mock_otlp_collector.get_metrics(expected_count=len(expected_metrics)) + assert len(metrics) > 0, "No metrics found within timeout" + + # Filter metrics to only those from the specific model used in the request + # This prevents issues when multiple metrics with the same name exist from different models + # (e.g., when safety models like llama-guard are also called) + model_metrics = {} + all_model_ids = set() + + for name, metric in metrics.items(): + if name in expected_metrics: + model_id = metric.get_attribute("model_id") + all_model_ids.add(model_id) + # Only include metrics from the specific model used in the test request + if model_id == text_model_id: + model_metrics[name] = metric + + # Provide helpful error message if we have metrics from multiple models + if len(all_model_ids) > 1: + print(f"Note: Found metrics from multiple models: {sorted(all_model_ids)}") + print(f"Filtering to only metrics from test model: {text_model_id}") + + # Verify expected metrics are present for our specific model for metric_name in expected_metrics: - assert mock_otlp_collector.has_metric(metric_name), ( - f"Expected metric {metric_name} not found in {mock_otlp_collector.get_metric_names()}" + assert metric_name in model_metrics, ( + f"Expected metric {metric_name} for model {text_model_id} not found. " + f"Available models: {sorted(all_model_ids)}, " + f"Available metrics for {text_model_id}: {list(model_metrics.keys())}" ) # Verify metric values match usage data - assert mock_otlp_collector.get_metric_value("completion_tokens") == usage["completion_tokens"] - assert mock_otlp_collector.get_metric_value("total_tokens") == usage["total_tokens"] - assert mock_otlp_collector.get_metric_value("prompt_tokens") == usage["prompt_tokens"] + assert model_metrics["completion_tokens"].get_value() == usage["completion_tokens"], ( + f"Expected {usage['completion_tokens']} for completion_tokens, but got {model_metrics['completion_tokens'].get_value()}" + ) + assert model_metrics["total_tokens"].get_value() == usage["total_tokens"], ( + f"Expected {usage['total_tokens']} for total_tokens, but got {model_metrics['total_tokens'].get_value()}" + ) + assert model_metrics["prompt_tokens"].get_value() == usage["prompt_tokens"], ( + f"Expected {usage['prompt_tokens']} for prompt_tokens, but got {model_metrics['prompt_tokens'].get_value()}" + )