diff --git a/llama_stack/core/library_client.py b/llama_stack/core/library_client.py index e722e4de6..9f8f35bdf 100644 --- a/llama_stack/core/library_client.py +++ b/llama_stack/core/library_client.py @@ -48,12 +48,7 @@ from llama_stack.core.utils.config import redact_sensitive_fields from llama_stack.core.utils.context import preserve_contexts_async_generator from llama_stack.core.utils.exec import in_notebook from llama_stack.log import get_logger -from llama_stack.providers.utils.telemetry.tracing import ( - CURRENT_TRACE_CONTEXT, - end_trace, - setup_logger, - start_trace, -) + logger = get_logger(name=__name__, category="core") @@ -293,8 +288,6 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): raise _e assert self.impls is not None - if Api.telemetry in self.impls: - setup_logger(self.impls[Api.telemetry]) if not os.environ.get("PYTEST_CURRENT_TEST"): console = Console() @@ -380,13 +373,7 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): body, field_names = self._handle_file_uploads(options, body) body = self._convert_body(path, options.method, body, exclude_params=set(field_names)) - - trace_path = webmethod.descriptive_name or route_path - await start_trace(trace_path, {"__location__": "library_client"}) - try: - result = await matched_func(**body) - finally: - await end_trace() + result = await matched_func(**body) # Handle FastAPI Response objects (e.g., from file content retrieval) if isinstance(result, FastAPIResponse): @@ -444,9 +431,6 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): body = self._convert_body(path, options.method, body) - trace_path = webmethod.descriptive_name or route_path - await start_trace(trace_path, {"__location__": "library_client"}) - async def gen(): try: async for chunk in await func(**body): @@ -454,9 +438,9 @@ class AsyncLlamaStackAsLibraryClient(AsyncLlamaStackClient): sse_event = f"data: {data}\n\n" yield sse_event.encode("utf-8") finally: - await end_trace() + pass - wrapped_gen = preserve_contexts_async_generator(gen(), [CURRENT_TRACE_CONTEXT, PROVIDER_DATA_VAR]) + wrapped_gen = preserve_contexts_async_generator(gen(), [PROVIDER_DATA_VAR]) mock_response = httpx.Response( status_code=httpx.codes.OK, diff --git a/llama_stack/core/server/server.py b/llama_stack/core/server/server.py index c58a39e83..6f39eb2d7 100644 --- a/llama_stack/core/server/server.py +++ b/llama_stack/core/server/server.py @@ -74,6 +74,8 @@ logger = get_logger(name=__name__, category="core::server") def warn_with_traceback(message, category, filename, lineno, file=None, line=None): log = file if hasattr(file, "write") else sys.stderr + if log is None: + return traceback.print_stack(file=log) log.write(warnings.formatwarning(message, category, filename, lineno, line)) diff --git a/llama_stack/core/telemetry/__initi__.py b/llama_stack/core/telemetry/__initi__.py new file mode 100644 index 000000000..e69de29bb diff --git a/llama_stack/core/telemetry/telemetry.py b/llama_stack/core/telemetry/telemetry.py new file mode 100644 index 000000000..fafe7cce5 --- /dev/null +++ b/llama_stack/core/telemetry/telemetry.py @@ -0,0 +1,49 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. +from abc import abstractmethod +from fastapi import FastAPI +from pydantic import BaseModel +from typing import Any + + +class TelemetryProvider(BaseModel): + """ + TelemetryProvider standardizes how telemetry is provided to the application. + """ + @abstractmethod + def fastapi_middleware(self, app: FastAPI, *args, **kwargs): + """ + Injects FastAPI middleware that instruments the application for telemetry. + """ + ... + + @abstractmethod + def custom_trace(self, name: str, *args, **kwargs) -> Any: + """ + Creates a custom trace. + """ + ... + + @abstractmethod + def record_count(self, name: str, *args, **kwargs): + """ + Increments a counter metric. + """ + ... + + @abstractmethod + def record_histogram(self, name: str, *args, **kwargs): + """ + Records a histogram metric. + """ + ... + + @abstractmethod + def record_up_down_counter(self, name: str, *args, **kwargs): + """ + Records an up/down counter metric. + """ + ... diff --git a/llama_stack/providers/inline/telemetry/otel/README.md b/llama_stack/providers/inline/telemetry/otel/README.md index 51fddf6c1..73089dd04 100644 --- a/llama_stack/providers/inline/telemetry/otel/README.md +++ b/llama_stack/providers/inline/telemetry/otel/README.md @@ -10,17 +10,23 @@ First, bootstrap and install all necessary libraries for open telemtry: uv run opentelemetry-bootstrap -a requirements | uv pip install --requirement - ``` -Then, run with automatic code injection: - +Make sure you export required environment variables for open telemetry: ``` -uv run opentelemetry-instrument llama stack run --config myconfig.yaml +export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318" ``` -### Excluded Fast API URLs +If you want certian endpoints to be ignored from the fast API telemetry, set the following environment variable: ``` export OTEL_PYTHON_FASTAPI_EXCLUDED_URLS="client/.*/info,healthcheck" ``` -#### Environment Variables +Finaly, run Llama Stack with automatic code injection: + +``` +uv run opentelemetry-instrument llama stack run --config myconfig.yaml +``` + +#### Open Telemetry Configuration Environment Variables Environment Variables: https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ diff --git a/llama_stack/providers/inline/telemetry/otel/config.py b/llama_stack/providers/inline/telemetry/otel/config.py index 0d24f8440..e1ff2f1b0 100644 --- a/llama_stack/providers/inline/telemetry/otel/config.py +++ b/llama_stack/providers/inline/telemetry/otel/config.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field type BatchSpanProcessor = Literal["batch"] type SimpleSpanProcessor = Literal["simple"] + class OTelTelemetryConfig(BaseModel): """ The configuration for the OpenTelemetry telemetry provider. diff --git a/llama_stack/providers/inline/telemetry/otel/otel.py b/llama_stack/providers/inline/telemetry/otel/otel.py index dcffa9f6d..1d2e2e4ab 100644 --- a/llama_stack/providers/inline/telemetry/otel/otel.py +++ b/llama_stack/providers/inline/telemetry/otel/otel.py @@ -1,12 +1,18 @@ import os +import threading from opentelemetry import trace, metrics +from opentelemetry.context.context import Context from opentelemetry.sdk.resources import Attributes, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.metrics import Counter, UpDownCounter, Histogram, ObservableGauge from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.trace import Span, SpanKind, _Links +from typing import Sequence +from pydantic import PrivateAttr from llama_stack.core.telemetry.tracing import TelemetryProvider from llama_stack.log import get_logger @@ -22,8 +28,17 @@ class OTelTelemetryProvider(TelemetryProvider): """ A simple Open Telemetry native telemetry provider. """ - def __init__(self, config: OTelTelemetryConfig): - self.config = config + config: OTelTelemetryConfig + _counters: dict[str, Counter] = PrivateAttr(default_factory=dict) + _up_down_counters: dict[str, UpDownCounter] = PrivateAttr(default_factory=dict) + _histograms: dict[str, Histogram] = PrivateAttr(default_factory=dict) + _gauges: dict[str, ObservableGauge] = PrivateAttr(default_factory=dict) + + + def model_post_init(self, __context): + """Initialize provider after Pydantic validation.""" + self._lock = threading.Lock() + attributes: Attributes = { key: value for key, value in { @@ -52,7 +67,7 @@ class OTelTelemetryProvider(TelemetryProvider): meter_provider = MeterProvider(resource=resource) metrics.set_meter_provider(meter_provider) - # Do not fail the application, but warn the user if the endpoints are not set properly + # Do not fail the application, but warn the user if the endpoints are not set properly. if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"): if not os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"): logger.warning("OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is not set. Traces will not be exported.") @@ -61,3 +76,66 @@ class OTelTelemetryProvider(TelemetryProvider): def fastapi_middleware(self, app: FastAPI): FastAPIInstrumentor.instrument_app(app) + + def custom_trace(self, + name: str, + context: Context | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: Attributes = {}, + links: _Links = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True) -> Span: + """ + Creates a custom tracing span using the Open Telemetry SDK. + """ + tracer = trace.get_tracer(__name__) + return tracer.start_span(name, context, kind, attributes, links, start_time, record_exception, set_status_on_exception) + + + def record_count(self, name: str, amount: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""): + """ + Increments a counter metric using the Open Telemetry SDK that are indexed by the meter name. + This function is designed to be compatible with other popular telemetry providers design patterns, + like Datadog and New Relic. + """ + meter = metrics.get_meter(__name__) + + with self._lock: + if name not in self._counters: + self._counters[name] = meter.create_counter(name, unit=unit, description=description) + counter = self._counters[name] + + counter.add(amount, attributes=attributes, context=context) + + + def record_histogram(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = "", explicit_bucket_boundaries_advisory: Sequence[float] | None = None): + """ + Records a histogram metric using the Open Telemetry SDK that are indexed by the meter name. + This function is designed to be compatible with other popular telemetry providers design patterns, + like Datadog and New Relic. + """ + meter = metrics.get_meter(__name__) + + with self._lock: + if name not in self._histograms: + self._histograms[name] = meter.create_histogram(name, unit=unit, description=description, explicit_bucket_boundaries_advisory=explicit_bucket_boundaries_advisory) + histogram = self._histograms[name] + + histogram.record(value, attributes=attributes, context=context) + + + def record_up_down_counter(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""): + """ + Records an up/down counter metric using the Open Telemetry SDK that are indexed by the meter name. + This function is designed to be compatible with other popular telemetry providers design patterns, + like Datadog and New Relic. + """ + meter = metrics.get_meter(__name__) + + with self._lock: + if name not in self._up_down_counters: + self._up_down_counters[name] = meter.create_up_down_counter(name, unit=unit, description=description) + up_down_counter = self._up_down_counters[name] + + up_down_counter.add(value, attributes=attributes, context=context) diff --git a/llama_stack/providers/utils/telemetry/tracing.py b/llama_stack/providers/utils/telemetry/tracing.py index 62cceb13e..8f47c6b44 100644 --- a/llama_stack/providers/utils/telemetry/tracing.py +++ b/llama_stack/providers/utils/telemetry/tracing.py @@ -3,6 +3,8 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +# +# Deprecated. Use the Open Telemetry SDK instead. import asyncio import contextvars diff --git a/scripts/setup_telemetry.sh b/scripts/setup_telemetry.sh index cf235ab9d..7577ff44e 100755 --- a/scripts/setup_telemetry.sh +++ b/scripts/setup_telemetry.sh @@ -53,7 +53,7 @@ $CONTAINER_RUNTIME run -d --name otel-collector \ -p 4317:4317 \ -p 9464:9464 \ -p 13133:13133 \ - -v $(pwd)/otel-collector-config.yaml:/etc/otel-collector-config.yaml:Z \ + -v $(pwd)/otel-collector-config.yaml:/etc/otel-collector-config.yaml:${OTEL_COLLECTOR_CONFIG_PATH}:Z \ docker.io/otel/opentelemetry-collector-contrib:latest \ --config /etc/otel-collector-config.yaml @@ -62,7 +62,7 @@ echo "📈 Starting Prometheus..." $CONTAINER_RUNTIME run -d --name prometheus \ --network llama-telemetry \ -p 9090:9090 \ - -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml:Z \ + -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml:${PROMETHEUS_CONFIG_PATH}:Z \ docker.io/prom/prometheus:latest \ --config.file=/etc/prometheus/prometheus.yml \ --storage.tsdb.path=/prometheus \ diff --git a/tests/integration/telemetry/test_otel_provider.py b/tests/integration/telemetry/test_otel_provider.py new file mode 100644 index 000000000..249dd6fb3 --- /dev/null +++ b/tests/integration/telemetry/test_otel_provider.py @@ -0,0 +1,532 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Integration tests for OpenTelemetry provider. + +These tests verify that the OTel provider correctly: +- Initializes within the Llama Stack +- Captures expected metrics (counters, histograms, up/down counters) +- Captures expected spans/traces +- Exports telemetry data to an OTLP collector (in-memory for testing) + +Tests use in-memory exporters to avoid external dependencies and can run in GitHub Actions. +""" + +import os +import time +from collections import defaultdict +from unittest.mock import patch + +import pytest +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig +from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider + + +@pytest.fixture(scope="module") +def in_memory_span_exporter(): + """Create an in-memory span exporter to capture traces.""" + return InMemorySpanExporter() + + +@pytest.fixture(scope="module") +def in_memory_metric_reader(): + """Create an in-memory metric reader to capture metrics.""" + return InMemoryMetricReader() + + +@pytest.fixture(scope="module") +def otel_provider_with_memory_exporters(in_memory_span_exporter, in_memory_metric_reader): + """ + Create an OTelTelemetryProvider configured with in-memory exporters. + + This allows us to capture and verify telemetry data without external services. + Returns a dict with 'provider', 'span_exporter', and 'metric_reader'. + """ + # Set mock environment to avoid warnings + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4318" + + config = OTelTelemetryConfig( + service_name="test-llama-stack-otel", + service_version="1.0.0-test", + deployment_environment="ci-test", + span_processor="simple", + ) + + # Patch the provider to use in-memory exporters + with patch.object( + OTelTelemetryProvider, + 'model_post_init', + lambda self, _: _init_with_memory_exporters( + self, config, in_memory_span_exporter, in_memory_metric_reader + ) + ): + provider = OTelTelemetryProvider(config=config) + yield { + 'provider': provider, + 'span_exporter': in_memory_span_exporter, + 'metric_reader': in_memory_metric_reader + } + + +def _init_with_memory_exporters(provider, config, span_exporter, metric_reader): + """Helper to initialize provider with in-memory exporters.""" + import threading + from opentelemetry import metrics, trace + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Attributes, Resource + from opentelemetry.sdk.trace import TracerProvider + + # Initialize pydantic private attributes + if provider.__pydantic_private__ is None: + provider.__pydantic_private__ = {} + + provider._lock = threading.Lock() + provider._counters = {} + provider._up_down_counters = {} + provider._histograms = {} + provider._gauges = {} + + # Create resource attributes + attributes: Attributes = { + key: value + for key, value in { + "service.name": config.service_name, + "service.version": config.service_version, + "deployment.environment": config.deployment_environment, + }.items() + if value is not None + } + + resource = Resource.create(attributes) + + # Configure tracer provider with in-memory exporter + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + trace.set_tracer_provider(tracer_provider) + + # Configure meter provider with in-memory reader + meter_provider = MeterProvider( + resource=resource, + metric_readers=[metric_reader] + ) + metrics.set_meter_provider(meter_provider) + + +class TestOTelProviderInitialization: + """Test OTel provider initialization within Llama Stack.""" + + def test_provider_initializes_successfully(self, otel_provider_with_memory_exporters): + """Test that the OTel provider initializes without errors.""" + provider = otel_provider_with_memory_exporters['provider'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + assert provider is not None + assert provider.config.service_name == "test-llama-stack-otel" + assert provider.config.service_version == "1.0.0-test" + assert provider.config.deployment_environment == "ci-test" + + def test_provider_has_thread_safety_mechanisms(self, otel_provider_with_memory_exporters): + """Test that the provider has thread-safety mechanisms in place.""" + provider = otel_provider_with_memory_exporters['provider'] + + assert hasattr(provider, "_lock") + assert provider._lock is not None + assert hasattr(provider, "_counters") + assert hasattr(provider, "_histograms") + assert hasattr(provider, "_up_down_counters") + + +class TestOTelMetricsCapture: + """Test that OTel provider captures expected metrics.""" + + def test_counter_metric_is_captured(self, otel_provider_with_memory_exporters): + """Test that counter metrics are captured.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record counter metrics + provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"}) + provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"}) + provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/embeddings"}) + + # Force metric collection - collect() triggers the reader to gather metrics + metric_reader.collect() + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Verify metrics were captured + assert metrics_data is not None + assert len(metrics_data.resource_metrics) > 0 + + # Find our counter metric + found_counter = False + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == "llama.requests.total": + found_counter = True + # Verify it's a counter with data points + assert hasattr(metric.data, "data_points") + assert len(metric.data.data_points) > 0 + + assert found_counter, "Counter metric 'llama.requests.total' was not captured" + + def test_histogram_metric_is_captured(self, otel_provider_with_memory_exporters): + """Test that histogram metrics are captured.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record histogram metrics with various values + latencies = [10.5, 25.3, 50.1, 100.7, 250.2] + for latency in latencies: + provider.record_histogram( + "llama.inference.latency", + latency, + attributes={"model": "llama-3.2"} + ) + + # Force metric collection + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Find our histogram metric + found_histogram = False + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == "llama.inference.latency": + found_histogram = True + # Verify it's a histogram + assert hasattr(metric.data, "data_points") + data_point = metric.data.data_points[0] + # Histograms should have count and sum + assert hasattr(data_point, "count") + assert data_point.count == len(latencies) + + assert found_histogram, "Histogram metric 'llama.inference.latency' was not captured" + + def test_up_down_counter_metric_is_captured(self, otel_provider_with_memory_exporters): + """Test that up/down counter metrics are captured.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record up/down counter metrics + provider.record_up_down_counter("llama.active.sessions", 5) + provider.record_up_down_counter("llama.active.sessions", 3) + provider.record_up_down_counter("llama.active.sessions", -2) + + # Force metric collection + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Find our up/down counter metric + found_updown = False + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == "llama.active.sessions": + found_updown = True + assert hasattr(metric.data, "data_points") + assert len(metric.data.data_points) > 0 + + assert found_updown, "Up/Down counter metric 'llama.active.sessions' was not captured" + + def test_metrics_with_attributes_are_captured(self, otel_provider_with_memory_exporters): + """Test that metric attributes/labels are preserved.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record metrics with different attributes + provider.record_count("llama.tokens.generated", 150.0, attributes={ + "model": "llama-3.2-1b", + "user": "test-user" + }) + + # Force metric collection + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Verify attributes are preserved + found_with_attributes = False + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == "llama.tokens.generated": + data_point = metric.data.data_points[0] + # Check attributes - they're already a dict in the SDK + attrs = data_point.attributes if isinstance(data_point.attributes, dict) else {} + if "model" in attrs and "user" in attrs: + found_with_attributes = True + assert attrs["model"] == "llama-3.2-1b" + assert attrs["user"] == "test-user" + + assert found_with_attributes, "Metrics with attributes were not properly captured" + + def test_multiple_metric_types_coexist(self, otel_provider_with_memory_exporters): + """Test that different metric types can coexist.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record various metric types + provider.record_count("test.counter", 1.0) + provider.record_histogram("test.histogram", 42.0) + provider.record_up_down_counter("test.gauge", 10) + + # Force metric collection + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Count unique metrics + metric_names = set() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + metric_names.add(metric.name) + + # Should have all three metrics + assert "test.counter" in metric_names + assert "test.histogram" in metric_names + assert "test.gauge" in metric_names + + +class TestOTelSpansCapture: + """Test that OTel provider captures expected spans/traces.""" + + def test_basic_span_is_captured(self, otel_provider_with_memory_exporters): + """Test that basic spans are captured.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Create a span + span = provider.custom_trace("llama.inference.request") + span.end() + + # Get captured spans + spans = span_exporter.get_finished_spans() + + assert len(spans) > 0 + assert any(span.name == "llama.inference.request" for span in spans) + + def test_span_with_attributes_is_captured(self, otel_provider_with_memory_exporters): + """Test that span attributes are preserved.""" + provider = otel_provider_with_memory_exporters['provider'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Create a span with attributes + span = provider.custom_trace( + "llama.chat.completion", + attributes={ + "model.id": "llama-3.2-1b", + "user.id": "test-user-123", + "request.id": "req-abc-123" + } + ) + span.end() + + # Get captured spans + spans = span_exporter.get_finished_spans() + + # Find our span + our_span = None + for s in spans: + if s.name == "llama.chat.completion": + our_span = s + break + + assert our_span is not None, "Span 'llama.chat.completion' was not captured" + + # Verify attributes + attrs = dict(our_span.attributes) + assert attrs.get("model.id") == "llama-3.2-1b" + assert attrs.get("user.id") == "test-user-123" + assert attrs.get("request.id") == "req-abc-123" + + def test_multiple_spans_are_captured(self, otel_provider_with_memory_exporters): + """Test that multiple spans are captured.""" + provider = otel_provider_with_memory_exporters['provider'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Create multiple spans + span_names = [ + "llama.request.validate", + "llama.model.load", + "llama.inference.execute", + "llama.response.format" + ] + + for name in span_names: + span = provider.custom_trace(name) + time.sleep(0.01) # Small delay to ensure ordering + span.end() + + # Get captured spans + spans = span_exporter.get_finished_spans() + captured_names = {span.name for span in spans} + + # Verify all spans were captured + for expected_name in span_names: + assert expected_name in captured_names, f"Span '{expected_name}' was not captured" + + def test_span_has_service_metadata(self, otel_provider_with_memory_exporters): + """Test that spans include service metadata.""" + provider = otel_provider_with_memory_exporters['provider'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Create a span + span = provider.custom_trace("test.span") + span.end() + + # Get captured spans + spans = span_exporter.get_finished_spans() + + assert len(spans) > 0 + + # Check resource attributes + span = spans[0] + resource_attrs = dict(span.resource.attributes) + + assert resource_attrs.get("service.name") == "test-llama-stack-otel" + assert resource_attrs.get("service.version") == "1.0.0-test" + assert resource_attrs.get("deployment.environment") == "ci-test" + + +class TestOTelDataExport: + """Test that telemetry data can be exported to OTLP collector.""" + + def test_metrics_are_exportable(self, otel_provider_with_memory_exporters): + """Test that metrics can be exported.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + + # Record metrics + provider.record_count("export.test.counter", 5.0) + provider.record_histogram("export.test.histogram", 123.45) + + # Force export + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + + # Verify data structure is exportable + assert metrics_data is not None + assert hasattr(metrics_data, "resource_metrics") + assert len(metrics_data.resource_metrics) > 0 + + # Verify resource attributes are present (needed for OTLP export) + resource = metrics_data.resource_metrics[0].resource + assert resource is not None + assert len(resource.attributes) > 0 + + def test_spans_are_exportable(self, otel_provider_with_memory_exporters): + """Test that spans can be exported.""" + provider = otel_provider_with_memory_exporters['provider'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Create spans + span1 = provider.custom_trace("export.test.span1") + span1.end() + + span2 = provider.custom_trace("export.test.span2") + span2.end() + + # Get exported spans + spans = span_exporter.get_finished_spans() + + # Verify spans have required OTLP fields + assert len(spans) >= 2 + for span in spans: + assert span.name is not None + assert span.context is not None + assert span.context.trace_id is not None + assert span.context.span_id is not None + assert span.resource is not None + + def test_concurrent_export_is_safe(self, otel_provider_with_memory_exporters): + """Test that concurrent metric/span recording doesn't break export.""" + import concurrent.futures + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + def record_data(thread_id): + for i in range(10): + provider.record_count(f"concurrent.counter.{thread_id}", 1.0) + span = provider.custom_trace(f"concurrent.span.{thread_id}.{i}") + span.end() + + # Record from multiple threads + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(record_data, i) for i in range(5)] + concurrent.futures.wait(futures) + + # Verify export still works + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + spans = span_exporter.get_finished_spans() + + assert metrics_data is not None + assert len(spans) >= 50 # 5 threads * 10 spans each + + +@pytest.mark.integration +class TestOTelProviderIntegration: + """End-to-end integration tests simulating real usage.""" + + def test_complete_inference_workflow_telemetry(self, otel_provider_with_memory_exporters): + """Simulate a complete inference workflow with telemetry.""" + provider = otel_provider_with_memory_exporters['provider'] + metric_reader = otel_provider_with_memory_exporters['metric_reader'] + span_exporter = otel_provider_with_memory_exporters['span_exporter'] + + # Simulate inference workflow + request_span = provider.custom_trace( + "llama.inference.request", + attributes={"model": "llama-3.2-1b", "user": "test"} + ) + + # Track metrics during inference + provider.record_count("llama.requests.received", 1.0) + provider.record_up_down_counter("llama.requests.in_flight", 1) + + # Simulate processing time + time.sleep(0.01) + provider.record_histogram("llama.request.duration_ms", 10.5) + + # Track tokens + provider.record_count("llama.tokens.input", 25.0) + provider.record_count("llama.tokens.output", 150.0) + + # End request + provider.record_up_down_counter("llama.requests.in_flight", -1) + provider.record_count("llama.requests.completed", 1.0) + request_span.end() + + # Verify all telemetry was captured + metric_reader.collect() + metrics_data = metric_reader.get_metrics_data() + spans = span_exporter.get_finished_spans() + + # Check metrics exist + metric_names = set() + for rm in metrics_data.resource_metrics: + for sm in rm.scope_metrics: + for m in sm.metrics: + metric_names.add(m.name) + + assert "llama.requests.received" in metric_names + assert "llama.requests.in_flight" in metric_names + assert "llama.request.duration_ms" in metric_names + assert "llama.tokens.input" in metric_names + assert "llama.tokens.output" in metric_names + + # Check span exists + assert any(s.name == "llama.inference.request" for s in spans) + diff --git a/tests/unit/providers/telemetry/test_otel.py b/tests/unit/providers/telemetry/test_otel.py new file mode 100644 index 000000000..b2c509648 --- /dev/null +++ b/tests/unit/providers/telemetry/test_otel.py @@ -0,0 +1,368 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import concurrent.futures +import threading +from unittest.mock import MagicMock + +import pytest + +from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig +from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider + + +@pytest.fixture +def otel_config(): + """Fixture providing a basic OTelTelemetryConfig.""" + return OTelTelemetryConfig( + service_name="test-service", + service_version="1.0.0", + deployment_environment="test", + span_processor="simple", + ) + + +@pytest.fixture +def otel_provider(otel_config, monkeypatch): + """Fixture providing an OTelTelemetryProvider instance with mocked environment.""" + # Set required environment variables to avoid warnings + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + return OTelTelemetryProvider(config=otel_config) + + +class TestOTelTelemetryProviderInitialization: + """Tests for OTelTelemetryProvider initialization.""" + + def test_initialization_with_valid_config(self, otel_config, monkeypatch): + """Test that provider initializes correctly with valid configuration.""" + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + + provider = OTelTelemetryProvider(config=otel_config) + + assert provider.config == otel_config + assert hasattr(provider, "_lock") + assert provider._lock is not None + assert isinstance(provider._counters, dict) + assert isinstance(provider._histograms, dict) + assert isinstance(provider._up_down_counters, dict) + assert isinstance(provider._gauges, dict) + + def test_initialization_sets_service_attributes(self, otel_config, monkeypatch): + """Test that service attributes are properly configured.""" + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + + provider = OTelTelemetryProvider(config=otel_config) + + assert provider.config.service_name == "test-service" + assert provider.config.service_version == "1.0.0" + assert provider.config.deployment_environment == "test" + + def test_initialization_with_batch_processor(self, monkeypatch): + """Test initialization with batch span processor.""" + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + config = OTelTelemetryConfig( + service_name="test-service", + service_version="1.0.0", + deployment_environment="test", + span_processor="batch", + ) + + provider = OTelTelemetryProvider(config=config) + + assert provider.config.span_processor == "batch" + + def test_warns_when_endpoints_missing(self, otel_config, monkeypatch, caplog): + """Test that warnings are issued when OTLP endpoints are not set.""" + # Remove all endpoint environment variables + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False) + + OTelTelemetryProvider(config=otel_config) + + # Check that warnings were logged + assert any("Traces will not be exported" in record.message for record in caplog.records) + assert any("Metrics will not be exported" in record.message for record in caplog.records) + + +class TestOTelTelemetryProviderMetrics: + """Tests for metric recording functionality.""" + + def test_record_count_creates_counter(self, otel_provider): + """Test that record_count creates a counter on first call.""" + assert "test_counter" not in otel_provider._counters + + otel_provider.record_count("test_counter", 1.0) + + assert "test_counter" in otel_provider._counters + assert otel_provider._counters["test_counter"] is not None + + def test_record_count_reuses_counter(self, otel_provider): + """Test that record_count reuses existing counter.""" + otel_provider.record_count("test_counter", 1.0) + first_counter = otel_provider._counters["test_counter"] + + otel_provider.record_count("test_counter", 2.0) + second_counter = otel_provider._counters["test_counter"] + + assert first_counter is second_counter + assert len(otel_provider._counters) == 1 + + def test_record_count_with_attributes(self, otel_provider): + """Test that record_count works with attributes.""" + otel_provider.record_count( + "test_counter", + 1.0, + attributes={"key": "value", "env": "test"} + ) + + assert "test_counter" in otel_provider._counters + + def test_record_histogram_creates_histogram(self, otel_provider): + """Test that record_histogram creates a histogram on first call.""" + assert "test_histogram" not in otel_provider._histograms + + otel_provider.record_histogram("test_histogram", 42.5) + + assert "test_histogram" in otel_provider._histograms + assert otel_provider._histograms["test_histogram"] is not None + + def test_record_histogram_reuses_histogram(self, otel_provider): + """Test that record_histogram reuses existing histogram.""" + otel_provider.record_histogram("test_histogram", 10.0) + first_histogram = otel_provider._histograms["test_histogram"] + + otel_provider.record_histogram("test_histogram", 20.0) + second_histogram = otel_provider._histograms["test_histogram"] + + assert first_histogram is second_histogram + assert len(otel_provider._histograms) == 1 + + def test_record_histogram_with_bucket_boundaries(self, otel_provider): + """Test that record_histogram works with explicit bucket boundaries.""" + boundaries = [0.0, 10.0, 50.0, 100.0] + + otel_provider.record_histogram( + "test_histogram", + 25.0, + explicit_bucket_boundaries_advisory=boundaries + ) + + assert "test_histogram" in otel_provider._histograms + + def test_record_up_down_counter_creates_counter(self, otel_provider): + """Test that record_up_down_counter creates a counter on first call.""" + assert "test_updown" not in otel_provider._up_down_counters + + otel_provider.record_up_down_counter("test_updown", 1.0) + + assert "test_updown" in otel_provider._up_down_counters + assert otel_provider._up_down_counters["test_updown"] is not None + + def test_record_up_down_counter_reuses_counter(self, otel_provider): + """Test that record_up_down_counter reuses existing counter.""" + otel_provider.record_up_down_counter("test_updown", 5.0) + first_counter = otel_provider._up_down_counters["test_updown"] + + otel_provider.record_up_down_counter("test_updown", -3.0) + second_counter = otel_provider._up_down_counters["test_updown"] + + assert first_counter is second_counter + assert len(otel_provider._up_down_counters) == 1 + + def test_multiple_metrics_with_different_names(self, otel_provider): + """Test that multiple metrics with different names are cached separately.""" + otel_provider.record_count("counter1", 1.0) + otel_provider.record_count("counter2", 2.0) + otel_provider.record_histogram("histogram1", 10.0) + otel_provider.record_up_down_counter("updown1", 5.0) + + assert len(otel_provider._counters) == 2 + assert len(otel_provider._histograms) == 1 + assert len(otel_provider._up_down_counters) == 1 + + +class TestOTelTelemetryProviderThreadSafety: + """Tests for thread safety of metric operations.""" + + def test_concurrent_counter_creation_same_name(self, otel_provider): + """Test that concurrent calls to record_count with same name are thread-safe.""" + num_threads = 50 + counter_name = "concurrent_counter" + + def record_metric(): + otel_provider.record_count(counter_name, 1.0) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(record_metric) for _ in range(num_threads)] + concurrent.futures.wait(futures) + + # Should have exactly one counter created despite concurrent access + assert len(otel_provider._counters) == 1 + assert counter_name in otel_provider._counters + + def test_concurrent_histogram_creation_same_name(self, otel_provider): + """Test that concurrent calls to record_histogram with same name are thread-safe.""" + num_threads = 50 + histogram_name = "concurrent_histogram" + + def record_metric(): + thread_id = threading.current_thread().ident or 0 + otel_provider.record_histogram(histogram_name, float(thread_id % 100)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(record_metric) for _ in range(num_threads)] + concurrent.futures.wait(futures) + + # Should have exactly one histogram created despite concurrent access + assert len(otel_provider._histograms) == 1 + assert histogram_name in otel_provider._histograms + + def test_concurrent_up_down_counter_creation_same_name(self, otel_provider): + """Test that concurrent calls to record_up_down_counter with same name are thread-safe.""" + num_threads = 50 + counter_name = "concurrent_updown" + + def record_metric(): + otel_provider.record_up_down_counter(counter_name, 1.0) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(record_metric) for _ in range(num_threads)] + concurrent.futures.wait(futures) + + # Should have exactly one counter created despite concurrent access + assert len(otel_provider._up_down_counters) == 1 + assert counter_name in otel_provider._up_down_counters + + def test_concurrent_mixed_metrics_different_names(self, otel_provider): + """Test concurrent creation of different metric types with different names.""" + num_threads = 30 + + def record_counters(thread_id): + otel_provider.record_count(f"counter_{thread_id}", 1.0) + + def record_histograms(thread_id): + otel_provider.record_histogram(f"histogram_{thread_id}", float(thread_id)) + + def record_up_down_counters(thread_id): + otel_provider.record_up_down_counter(f"updown_{thread_id}", float(thread_id)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads * 3) as executor: + futures = [] + for i in range(num_threads): + futures.append(executor.submit(record_counters, i)) + futures.append(executor.submit(record_histograms, i)) + futures.append(executor.submit(record_up_down_counters, i)) + + concurrent.futures.wait(futures) + + # Each thread should have created its own metric + assert len(otel_provider._counters) == num_threads + assert len(otel_provider._histograms) == num_threads + assert len(otel_provider._up_down_counters) == num_threads + + def test_concurrent_access_existing_and_new_metrics(self, otel_provider): + """Test concurrent access mixing existing and new metric creation.""" + # Pre-create some metrics + otel_provider.record_count("existing_counter", 1.0) + otel_provider.record_histogram("existing_histogram", 10.0) + + num_threads = 40 + + def mixed_operations(thread_id): + # Half the threads use existing metrics, half create new ones + if thread_id % 2 == 0: + otel_provider.record_count("existing_counter", 1.0) + otel_provider.record_histogram("existing_histogram", float(thread_id)) + else: + otel_provider.record_count(f"new_counter_{thread_id}", 1.0) + otel_provider.record_histogram(f"new_histogram_{thread_id}", float(thread_id)) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(mixed_operations, i) for i in range(num_threads)] + concurrent.futures.wait(futures) + + # Should have existing metrics plus half of num_threads new ones + expected_new_counters = num_threads // 2 + expected_new_histograms = num_threads // 2 + + assert len(otel_provider._counters) == 1 + expected_new_counters + assert len(otel_provider._histograms) == 1 + expected_new_histograms + + +class TestOTelTelemetryProviderTracing: + """Tests for tracing functionality.""" + + def test_custom_trace_creates_span(self, otel_provider): + """Test that custom_trace creates a span.""" + span = otel_provider.custom_trace("test_span") + + assert span is not None + assert hasattr(span, "get_span_context") + + def test_custom_trace_with_attributes(self, otel_provider): + """Test that custom_trace works with attributes.""" + attributes = {"key": "value", "operation": "test"} + + span = otel_provider.custom_trace("test_span", attributes=attributes) + + assert span is not None + + def test_fastapi_middleware(self, otel_provider): + """Test that fastapi_middleware can be called.""" + mock_app = MagicMock() + + # Should not raise an exception + otel_provider.fastapi_middleware(mock_app) + + +class TestOTelTelemetryProviderEdgeCases: + """Tests for edge cases and error conditions.""" + + def test_record_count_with_zero(self, otel_provider): + """Test that record_count works with zero value.""" + otel_provider.record_count("zero_counter", 0.0) + + assert "zero_counter" in otel_provider._counters + + def test_record_count_with_large_value(self, otel_provider): + """Test that record_count works with large values.""" + otel_provider.record_count("large_counter", 1_000_000.0) + + assert "large_counter" in otel_provider._counters + + def test_record_histogram_with_negative_value(self, otel_provider): + """Test that record_histogram works with negative values.""" + otel_provider.record_histogram("negative_histogram", -10.0) + + assert "negative_histogram" in otel_provider._histograms + + def test_record_up_down_counter_with_negative_value(self, otel_provider): + """Test that record_up_down_counter works with negative values.""" + otel_provider.record_up_down_counter("negative_updown", -5.0) + + assert "negative_updown" in otel_provider._up_down_counters + + def test_metric_names_with_special_characters(self, otel_provider): + """Test that metric names with dots and underscores work.""" + otel_provider.record_count("test.counter_name-special", 1.0) + otel_provider.record_histogram("test.histogram_name-special", 10.0) + + assert "test.counter_name-special" in otel_provider._counters + assert "test.histogram_name-special" in otel_provider._histograms + + def test_empty_attributes_dict(self, otel_provider): + """Test that empty attributes dict is handled correctly.""" + otel_provider.record_count("test_counter", 1.0, attributes={}) + + assert "test_counter" in otel_provider._counters + + def test_none_attributes(self, otel_provider): + """Test that None attributes are handled correctly.""" + otel_provider.record_count("test_counter", 1.0, attributes=None) + + assert "test_counter" in otel_provider._counters +