mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-04 10:10:36 +00:00
fix(telemetry): remove telemetry tests :(
This commit is contained in:
parent
420d267364
commit
aca1d6352c
8 changed files with 0 additions and 4752 deletions
|
|
@ -1,146 +0,0 @@
|
|||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
"""OTLP HTTP telemetry collector used for server-mode tests."""
|
||||
|
||||
import gzip
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from socketserver import ThreadingMixIn
|
||||
|
||||
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
|
||||
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest
|
||||
|
||||
from .base import BaseTelemetryCollector, MetricStub, SpanStub, attributes_to_dict
|
||||
|
||||
|
||||
class OtlpHttpTestCollector(BaseTelemetryCollector):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._spans: list[SpanStub] = []
|
||||
self._metrics: list[MetricStub] = []
|
||||
self._lock = threading.Lock()
|
||||
|
||||
class _ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
||||
daemon_threads = True
|
||||
allow_reuse_address = True
|
||||
|
||||
configured_port = int(os.environ.get("LLAMA_STACK_TEST_COLLECTOR_PORT", "0"))
|
||||
|
||||
self._server = _ThreadingHTTPServer(("127.0.0.1", configured_port), _CollectorHandler)
|
||||
self._server.collector = self # type: ignore[attr-defined]
|
||||
port = self._server.server_address[1]
|
||||
self.endpoint = f"http://127.0.0.1:{port}"
|
||||
|
||||
self._thread = threading.Thread(target=self._server.serve_forever, name="otel-test-collector", daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def _handle_traces(self, request: ExportTraceServiceRequest) -> None:
|
||||
new_spans: list[SpanStub] = []
|
||||
|
||||
for resource_spans in request.resource_spans:
|
||||
resource_attrs = attributes_to_dict(resource_spans.resource.attributes)
|
||||
|
||||
for scope_spans in resource_spans.scope_spans:
|
||||
for span in scope_spans.spans:
|
||||
new_spans.append(self._create_span_stub_from_protobuf(span, resource_attrs or None))
|
||||
|
||||
if not new_spans:
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
self._spans.extend(new_spans)
|
||||
|
||||
def _handle_metrics(self, request: ExportMetricsServiceRequest) -> None:
|
||||
new_metrics: list[MetricStub] = []
|
||||
for resource_metrics in request.resource_metrics:
|
||||
for scope_metrics in resource_metrics.scope_metrics:
|
||||
for metric in scope_metrics.metrics:
|
||||
# Handle multiple data points per metric (e.g., different attribute sets)
|
||||
metric_stubs = self._create_metric_stubs_from_protobuf(metric)
|
||||
new_metrics.extend(metric_stubs)
|
||||
|
||||
if not new_metrics:
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
self._metrics.extend(new_metrics)
|
||||
|
||||
def _snapshot_spans(self) -> tuple[SpanStub, ...]:
|
||||
with self._lock:
|
||||
return tuple(self._spans)
|
||||
|
||||
def _snapshot_metrics(self) -> tuple[MetricStub, ...] | None:
|
||||
with self._lock:
|
||||
return tuple(self._metrics) if self._metrics else None
|
||||
|
||||
def _clear_impl(self) -> None:
|
||||
"""Clear telemetry over a period of time to prevent race conditions between tests."""
|
||||
with self._lock:
|
||||
self._spans.clear()
|
||||
self._metrics.clear()
|
||||
|
||||
# Prevent race conditions where telemetry arrives after clear() but before
|
||||
# the test starts, causing contamination between tests
|
||||
deadline = time.time() + 2.0 # Maximum wait time
|
||||
last_span_count = 0
|
||||
last_metric_count = 0
|
||||
stable_iterations = 0
|
||||
|
||||
while time.time() < deadline:
|
||||
with self._lock:
|
||||
current_span_count = len(self._spans)
|
||||
current_metric_count = len(self._metrics)
|
||||
|
||||
if current_span_count == last_span_count and current_metric_count == last_metric_count:
|
||||
stable_iterations += 1
|
||||
if stable_iterations >= 4: # 4 * 50ms = 200ms of stability
|
||||
break
|
||||
else:
|
||||
stable_iterations = 0
|
||||
last_span_count = current_span_count
|
||||
last_metric_count = current_metric_count
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# Final clear to remove any telemetry that arrived during stabilization
|
||||
with self._lock:
|
||||
self._spans.clear()
|
||||
self._metrics.clear()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
self._server.shutdown()
|
||||
self._server.server_close()
|
||||
self._thread.join(timeout=1)
|
||||
|
||||
|
||||
class _CollectorHandler(BaseHTTPRequestHandler):
|
||||
def do_POST(self) -> None: # noqa: N802 Function name `do_POST` should be lowercase
|
||||
collector: OtlpHttpTestCollector = self.server.collector # type: ignore[attr-defined]
|
||||
length = int(self.headers.get("content-length", "0"))
|
||||
body = self.rfile.read(length)
|
||||
if self.headers.get("content-encoding") == "gzip":
|
||||
body = gzip.decompress(body)
|
||||
|
||||
if self.path == "/v1/traces":
|
||||
request = ExportTraceServiceRequest()
|
||||
request.ParseFromString(body)
|
||||
collector._handle_traces(request)
|
||||
self._respond_ok()
|
||||
elif self.path == "/v1/metrics":
|
||||
request = ExportMetricsServiceRequest()
|
||||
request.ParseFromString(body)
|
||||
collector._handle_metrics(request)
|
||||
self._respond_ok()
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
def _respond_ok(self) -> None:
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
Loading…
Add table
Add a link
Reference in a new issue