fix(pr): clean up and linting

This commit is contained in:
Emilio Garcia 2025-10-03 12:47:29 -04:00
parent 2b7a765d02
commit 0c45c9830c
3 changed files with 58 additions and 520 deletions

View file

@ -6,11 +6,6 @@
from abc import abstractmethod
from fastapi import FastAPI
from opentelemetry.metrics import Meter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Attributes
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import Tracer
from pydantic import BaseModel
from sqlalchemy import Engine
@ -33,31 +28,3 @@ class TelemetryProvider(BaseModel):
Injects SQLAlchemy instrumentation that instruments the application for telemetry.
"""
...
@abstractmethod
def get_tracer(
self,
instrumenting_module_name: str,
instrumenting_library_version: str | None = None,
tracer_provider: TracerProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None,
) -> Tracer:
"""
Gets a tracer.
"""
...
@abstractmethod
def get_meter(
self,
name: str,
version: str = "",
meter_provider: MeterProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None,
) -> Meter:
"""
Gets a meter.
"""
...

View file

@ -11,12 +11,10 @@ from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.metrics import Meter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Attributes, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from opentelemetry.trace import Tracer
from sqlalchemy import Engine
from llama_stack.core.telemetry.telemetry import TelemetryProvider
@ -88,7 +86,7 @@ class OTelTelemetryProvider(TelemetryProvider):
FastAPIInstrumentor.instrument_app(app)
# Add custom middleware for HTTP metrics
meter = self.get_meter("llama_stack.http.server")
meter = metrics.get_meter("llama_stack.http.server")
# Create HTTP metrics following semantic conventions
# https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
@ -153,31 +151,3 @@ class OTelTelemetryProvider(TelemetryProvider):
if engine:
kwargs["engine"] = engine
SQLAlchemyInstrumentor().instrument(**kwargs)
def get_tracer(
self,
instrumenting_module_name: str,
instrumenting_library_version: str | None = None,
tracer_provider: TracerProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None,
) -> Tracer:
return trace.get_tracer(
instrumenting_module_name=instrumenting_module_name,
instrumenting_library_version=instrumenting_library_version,
tracer_provider=tracer_provider,
schema_url=schema_url,
attributes=attributes,
)
def get_meter(
self,
name: str,
version: str = "",
meter_provider: MeterProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None,
) -> Meter:
return metrics.get_meter(
name=name, version=version, meter_provider=meter_provider, schema_url=schema_url, attributes=attributes
)

View file

@ -4,13 +4,19 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import concurrent.futures
"""
Unit tests for OTel Telemetry Provider.
These tests focus on the provider's functionality:
- Initialization and configuration
- FastAPI middleware setup
- SQLAlchemy instrumentation
- Environment variable handling
"""
from unittest.mock import MagicMock
import pytest
from opentelemetry import trace
from opentelemetry.metrics import Meter
from opentelemetry.trace import Tracer
from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig
from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider
@ -29,29 +35,21 @@ def otel_config():
@pytest.fixture
def otel_provider(otel_config, monkeypatch):
"""Fixture providing an OTelTelemetryProvider instance with mocked environment."""
# Set required environment variables to avoid warnings
"""Fixture providing an OTelTelemetryProvider instance."""
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
return OTelTelemetryProvider(config=otel_config)
class TestOTelTelemetryProviderInitialization:
"""Tests for OTelTelemetryProvider initialization."""
class TestOTelProviderInitialization:
"""Tests for OTel provider initialization and configuration."""
def test_initialization_with_valid_config(self, otel_config, monkeypatch):
def test_provider_initializes_with_valid_config(self, otel_config, monkeypatch):
"""Test that provider initializes correctly with valid configuration."""
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
provider = OTelTelemetryProvider(config=otel_config)
assert provider.config == otel_config
def test_initialization_sets_service_attributes(self, otel_config, monkeypatch):
"""Test that service attributes are properly configured."""
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
provider = OTelTelemetryProvider(config=otel_config)
assert provider.config.service_name == "test-service"
assert provider.config.service_version == "1.0.0"
assert provider.config.deployment_environment == "test"
@ -72,7 +70,6 @@ class TestOTelTelemetryProviderInitialization:
def test_warns_when_endpoints_missing(self, otel_config, monkeypatch, caplog):
"""Test that warnings are issued when OTLP endpoints are not set."""
# Remove all endpoint environment variables
monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False)
monkeypatch.delenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", raising=False)
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
@ -84,459 +81,63 @@ class TestOTelTelemetryProviderInitialization:
assert any("Metrics will not be exported" in record.message for record in caplog.records)
class TestOTelTelemetryProviderTracerAPI:
"""Tests for the get_tracer() API."""
class TestOTelProviderMiddleware:
"""Tests for FastAPI and SQLAlchemy instrumentation."""
def test_get_tracer_returns_tracer(self, otel_provider):
"""Test that get_tracer returns a valid Tracer instance."""
tracer = otel_provider.get_tracer("test.module")
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_get_tracer_with_version(self, otel_provider):
"""Test that get_tracer works with version parameter."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module", instrumenting_library_version="1.0.0"
)
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_get_tracer_with_attributes(self, otel_provider):
"""Test that get_tracer works with attributes."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module", attributes={"component": "test", "tier": "backend"}
)
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_get_tracer_with_schema_url(self, otel_provider):
"""Test that get_tracer works with schema URL."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module", schema_url="https://example.com/schema"
)
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_tracer_can_create_spans(self, otel_provider):
"""Test that tracer can create spans."""
tracer = otel_provider.get_tracer("test.module")
with tracer.start_as_current_span("test.operation") as span:
assert span is not None
assert span.is_recording()
def test_tracer_can_create_spans_with_attributes(self, otel_provider):
"""Test that tracer can create spans with attributes."""
tracer = otel_provider.get_tracer("test.module")
with tracer.start_as_current_span("test.operation", attributes={"user.id": "123", "request.id": "abc"}) as span:
assert span is not None
assert span.is_recording()
def test_multiple_tracers_can_coexist(self, otel_provider):
"""Test that multiple tracers can be created."""
tracer1 = otel_provider.get_tracer("module.one")
tracer2 = otel_provider.get_tracer("module.two")
assert tracer1 is not None
assert tracer2 is not None
# Tracers with different names might be the same instance or different
# depending on OTel implementation, so just verify both work
with tracer1.start_as_current_span("op1") as span1:
assert span1.is_recording()
with tracer2.start_as_current_span("op2") as span2:
assert span2.is_recording()
class TestOTelTelemetryProviderMeterAPI:
"""Tests for the get_meter() API."""
def test_get_meter_returns_meter(self, otel_provider):
"""Test that get_meter returns a valid Meter instance."""
meter = otel_provider.get_meter("test.meter")
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_version(self, otel_provider):
"""Test that get_meter works with version parameter."""
meter = otel_provider.get_meter(name="test.meter", version="1.0.0")
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_attributes(self, otel_provider):
"""Test that get_meter works with attributes."""
meter = otel_provider.get_meter(name="test.meter", attributes={"service": "test", "env": "dev"})
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_schema_url(self, otel_provider):
"""Test that get_meter works with schema URL."""
meter = otel_provider.get_meter(name="test.meter", schema_url="https://example.com/schema")
assert meter is not None
assert isinstance(meter, Meter)
def test_meter_can_create_counter(self, otel_provider):
"""Test that meter can create counters."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.requests.total", unit="requests", description="Total requests")
assert counter is not None
# Test that counter can be used
counter.add(1, {"endpoint": "/test"})
def test_meter_can_create_histogram(self, otel_provider):
"""Test that meter can create histograms."""
meter = otel_provider.get_meter("test.meter")
histogram = meter.create_histogram("test.request.duration", unit="ms", description="Request duration")
assert histogram is not None
# Test that histogram can be used
histogram.record(42.5, {"method": "GET"})
def test_meter_can_create_up_down_counter(self, otel_provider):
"""Test that meter can create up/down counters."""
meter = otel_provider.get_meter("test.meter")
up_down_counter = meter.create_up_down_counter(
"test.active.connections", unit="connections", description="Active connections"
)
assert up_down_counter is not None
# Test that up/down counter can be used
up_down_counter.add(5)
up_down_counter.add(-2)
def test_meter_can_create_observable_gauge(self, otel_provider):
"""Test that meter can create observable gauges."""
meter = otel_provider.get_meter("test.meter")
def gauge_callback(options):
return [{"attributes": {"host": "localhost"}, "value": 42.0}]
gauge = meter.create_observable_gauge(
"test.memory.usage", callbacks=[gauge_callback], unit="bytes", description="Memory usage"
)
assert gauge is not None
def test_multiple_instruments_from_same_meter(self, otel_provider):
"""Test that a meter can create multiple instruments."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
histogram = meter.create_histogram("test.histogram")
up_down_counter = meter.create_up_down_counter("test.gauge")
assert counter is not None
assert histogram is not None
assert up_down_counter is not None
# Verify they all work
counter.add(1)
histogram.record(10.0)
up_down_counter.add(5)
class TestOTelTelemetryProviderNativeUsage:
"""Tests for native OpenTelemetry usage patterns."""
def test_complete_tracing_workflow(self, otel_provider):
"""Test a complete tracing workflow using native OTel API."""
tracer = otel_provider.get_tracer("llama_stack.inference")
# Create parent span
with tracer.start_as_current_span("inference.request") as parent_span:
parent_span.set_attribute("model", "llama-3.2-1b")
parent_span.set_attribute("user", "test-user")
# Create child span
with tracer.start_as_current_span("model.load") as child_span:
child_span.set_attribute("model.size", "1B")
assert child_span.is_recording()
# Create another child span
with tracer.start_as_current_span("inference.execute") as child_span:
child_span.set_attribute("tokens.input", 25)
child_span.set_attribute("tokens.output", 150)
assert child_span.is_recording()
assert parent_span.is_recording()
def test_complete_metrics_workflow(self, otel_provider):
"""Test a complete metrics workflow using native OTel API."""
meter = otel_provider.get_meter("llama_stack.metrics")
# Create various instruments
request_counter = meter.create_counter("llama.requests.total", unit="requests", description="Total requests")
latency_histogram = meter.create_histogram(
"llama.inference.duration", unit="ms", description="Inference duration"
)
active_sessions = meter.create_up_down_counter(
"llama.sessions.active", unit="sessions", description="Active sessions"
)
# Use the instruments
request_counter.add(1, {"endpoint": "/chat", "status": "success"})
latency_histogram.record(123.45, {"model": "llama-3.2-1b"})
active_sessions.add(1)
active_sessions.add(-1)
# No exceptions means success
def test_concurrent_tracer_usage(self, otel_provider):
"""Test that multiple threads can use tracers concurrently."""
def create_spans(thread_id):
tracer = otel_provider.get_tracer(f"test.module.{thread_id}")
for i in range(10):
with tracer.start_as_current_span(f"operation.{i}") as span:
span.set_attribute("thread.id", thread_id)
span.set_attribute("iteration", i)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_spans, i) for i in range(10)]
concurrent.futures.wait(futures)
# If we get here without exceptions, thread safety is working
def test_concurrent_meter_usage(self, otel_provider):
"""Test that multiple threads can use meters concurrently."""
def record_metrics(thread_id):
meter = otel_provider.get_meter(f"test.meter.{thread_id}")
counter = meter.create_counter(f"test.counter.{thread_id}")
histogram = meter.create_histogram(f"test.histogram.{thread_id}")
for i in range(10):
counter.add(1, {"thread": str(thread_id)})
histogram.record(float(i * 10), {"thread": str(thread_id)})
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(record_metrics, i) for i in range(10)]
concurrent.futures.wait(futures)
# If we get here without exceptions, thread safety is working
def test_mixed_tracing_and_metrics(self, otel_provider):
"""Test using both tracing and metrics together."""
tracer = otel_provider.get_tracer("test.module")
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("operations.count")
histogram = meter.create_histogram("operation.duration")
# Trace an operation while recording metrics
with tracer.start_as_current_span("test.operation") as span:
counter.add(1)
span.set_attribute("step", "start")
histogram.record(50.0)
span.set_attribute("step", "processing")
counter.add(1)
span.set_attribute("step", "complete")
# No exceptions means success
class TestOTelTelemetryProviderFastAPIMiddleware:
"""Tests for FastAPI middleware functionality."""
def test_fastapi_middleware(self, otel_provider):
"""Test that fastapi_middleware can be called."""
def test_fastapi_middleware_can_be_applied(self, otel_provider):
"""Test that fastapi_middleware can be called without errors."""
mock_app = MagicMock()
# Should not raise an exception
otel_provider.fastapi_middleware(mock_app)
def test_fastapi_middleware_is_idempotent(self, otel_provider):
"""Test that calling fastapi_middleware multiple times is safe."""
mock_app = MagicMock()
# Verify FastAPIInstrumentor was called (it patches the app)
# The actual instrumentation is tested in E2E tests
# Should be able to call multiple times without error
otel_provider.fastapi_middleware(mock_app)
# Note: Second call might warn but shouldn't fail
# otel_provider.fastapi_middleware(mock_app)
class TestOTelTelemetryProviderEdgeCases:
"""Tests for edge cases and error conditions."""
def test_tracer_with_empty_module_name(self, otel_provider):
"""Test that get_tracer works with empty module name."""
tracer = otel_provider.get_tracer("")
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_meter_with_empty_name(self, otel_provider):
"""Test that get_meter works with empty name."""
meter = otel_provider.get_meter("")
assert meter is not None
assert isinstance(meter, Meter)
def test_meter_instruments_with_special_characters(self, otel_provider):
"""Test that metric names with dots, underscores, and hyphens work."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter_name-special")
histogram = meter.create_histogram("test.histogram_name-special")
assert counter is not None
assert histogram is not None
# Verify they can be used
counter.add(1)
histogram.record(10.0)
def test_meter_counter_with_zero_value(self, otel_provider):
"""Test that counters work with zero value."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
def test_sqlalchemy_instrumentation_without_engine(self, otel_provider):
"""
Test that sqlalchemy_instrumentation can be called.
Note: Testing with a real engine would require SQLAlchemy setup.
The actual instrumentation is tested when used with real databases.
"""
# Should not raise an exception
counter.add(0.0)
def test_meter_histogram_with_negative_value(self, otel_provider):
"""Test that histograms accept negative values."""
meter = otel_provider.get_meter("test.meter")
histogram = meter.create_histogram("test.histogram")
# Should not raise an exception
histogram.record(-10.0)
def test_meter_up_down_counter_with_negative_value(self, otel_provider):
"""Test that up/down counters work with negative values."""
meter = otel_provider.get_meter("test.meter")
up_down_counter = meter.create_up_down_counter("test.updown")
# Should not raise an exception
up_down_counter.add(-5.0)
def test_meter_instruments_with_empty_attributes(self, otel_provider):
"""Test that empty attributes dict is handled correctly."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
# Should not raise an exception
counter.add(1.0, attributes={})
def test_meter_instruments_with_none_attributes(self, otel_provider):
"""Test that None attributes are handled correctly."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
# Should not raise an exception
counter.add(1.0, attributes=None)
otel_provider.sqlalchemy_instrumentation()
class TestOTelTelemetryProviderRealisticScenarios:
"""Tests simulating realistic usage scenarios."""
class TestOTelProviderConfiguration:
"""Tests for configuration and environment variable handling."""
def test_inference_request_telemetry(self, otel_provider):
"""Simulate telemetry for a complete inference request."""
tracer = otel_provider.get_tracer("llama_stack.inference")
meter = otel_provider.get_meter("llama_stack.metrics")
def test_service_metadata_configuration(self, otel_provider):
"""Test that service metadata is properly configured."""
assert otel_provider.config.service_name == "test-service"
assert otel_provider.config.service_version == "1.0.0"
assert otel_provider.config.deployment_environment == "test"
# Create instruments
request_counter = meter.create_counter("llama.requests.total")
token_counter = meter.create_counter("llama.tokens.total")
latency_histogram = meter.create_histogram("llama.request.duration_ms")
in_flight_gauge = meter.create_up_down_counter("llama.requests.in_flight")
def test_span_processor_configuration(self, monkeypatch):
"""Test different span processor configurations."""
monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318")
# Simulate request
with tracer.start_as_current_span("inference.request") as request_span:
request_span.set_attribute("model.id", "llama-3.2-1b")
request_span.set_attribute("user.id", "test-user")
# Test simple processor
config_simple = OTelTelemetryConfig(
service_name="test",
span_processor="simple",
)
provider_simple = OTelTelemetryProvider(config=config_simple)
assert provider_simple.config.span_processor == "simple"
request_counter.add(1, {"model": "llama-3.2-1b"})
in_flight_gauge.add(1)
# Test batch processor
config_batch = OTelTelemetryConfig(
service_name="test",
span_processor="batch",
)
provider_batch = OTelTelemetryProvider(config=config_batch)
assert provider_batch.config.span_processor == "batch"
# Simulate token counting
token_counter.add(25, {"type": "input", "model": "llama-3.2-1b"})
token_counter.add(150, {"type": "output", "model": "llama-3.2-1b"})
def test_sample_run_config_generation(self):
"""Test that sample_run_config generates valid configuration."""
sample_config = OTelTelemetryConfig.sample_run_config()
# Simulate latency
latency_histogram.record(125.5, {"model": "llama-3.2-1b"})
in_flight_gauge.add(-1)
request_span.set_attribute("tokens.input", 25)
request_span.set_attribute("tokens.output", 150)
def test_multi_step_workflow_with_nested_spans(self, otel_provider):
"""Simulate a multi-step workflow with nested spans."""
tracer = otel_provider.get_tracer("llama_stack.workflow")
meter = otel_provider.get_meter("llama_stack.workflow.metrics")
step_counter = meter.create_counter("workflow.steps.completed")
with tracer.start_as_current_span("workflow.execute") as root_span:
root_span.set_attribute("workflow.id", "wf-123")
# Step 1: Validate
with tracer.start_as_current_span("step.validate") as span:
span.set_attribute("validation.result", "pass")
step_counter.add(1, {"step": "validate", "status": "success"})
# Step 2: Process
with tracer.start_as_current_span("step.process") as span:
span.set_attribute("items.processed", 100)
step_counter.add(1, {"step": "process", "status": "success"})
# Step 3: Finalize
with tracer.start_as_current_span("step.finalize") as span:
span.set_attribute("output.size", 1024)
step_counter.add(1, {"step": "finalize", "status": "success"})
root_span.set_attribute("workflow.status", "completed")
def test_error_handling_with_telemetry(self, otel_provider):
"""Test telemetry when errors occur."""
tracer = otel_provider.get_tracer("llama_stack.errors")
meter = otel_provider.get_meter("llama_stack.errors.metrics")
error_counter = meter.create_counter("llama.errors.total")
with tracer.start_as_current_span("operation.with.error") as span:
try:
span.set_attribute("step", "processing")
# Simulate an error
raise ValueError("Test error")
except ValueError as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
error_counter.add(1, {"error.type": "ValueError"})
# Should not raise - error was handled
def test_batch_operations_telemetry(self, otel_provider):
"""Test telemetry for batch operations."""
tracer = otel_provider.get_tracer("llama_stack.batch")
meter = otel_provider.get_meter("llama_stack.batch.metrics")
batch_counter = meter.create_counter("llama.batch.items.processed")
batch_duration = meter.create_histogram("llama.batch.duration_ms")
with tracer.start_as_current_span("batch.process") as batch_span:
batch_span.set_attribute("batch.size", 100)
for i in range(100):
with tracer.start_as_current_span(f"item.{i}") as item_span:
item_span.set_attribute("item.index", i)
batch_counter.add(1, {"status": "success"})
batch_duration.record(5000.0, {"batch.size": "100"})
batch_span.set_attribute("batch.status", "completed")
assert "service_name" in sample_config
assert "span_processor" in sample_config
assert "${env.OTEL_SERVICE_NAME" in sample_config["service_name"]