feat(telemetry:major): End to End Testing, Metric Capture, SQL Alchemy Injection

This commit is contained in:
Emilio Garcia 2025-10-03 12:17:41 -04:00
parent 9a0294ab4f
commit 4aa2dc110d
19 changed files with 1854 additions and 881 deletions

View file

@ -424,6 +424,7 @@ def create_app(
if Api.telemetry in impls:
impls[Api.telemetry].fastapi_middleware(app)
impls[Api.telemetry].sqlalchemy_instrumentation()
# Load external APIs if configured
external_apis = load_external_apis(config)

View file

@ -6,7 +6,13 @@
from abc import abstractmethod
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Any
from opentelemetry.trace import Tracer
from opentelemetry.metrics import Meter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Attributes
from sqlalchemy import Engine
class TelemetryProvider(BaseModel):
@ -21,29 +27,32 @@ class TelemetryProvider(BaseModel):
...
@abstractmethod
def custom_trace(self, name: str, *args, **kwargs) -> Any:
def sqlalchemy_instrumentation(self, engine: Engine | None = None):
"""
Creates a custom trace.
Injects SQLAlchemy instrumentation that instruments the application for telemetry.
"""
...
@abstractmethod
def record_count(self, name: str, *args, **kwargs):
def get_tracer(self,
instrumenting_module_name: str,
instrumenting_library_version: str | None = None,
tracer_provider: TracerProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None
) -> Tracer:
"""
Increments a counter metric.
Gets a tracer.
"""
...
@abstractmethod
def record_histogram(self, name: str, *args, **kwargs):
def get_meter(self, name: str,
version: str = "",
meter_provider: MeterProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None) -> Meter:
"""
Records a histogram metric.
"""
...
@abstractmethod
def record_up_down_counter(self, name: str, *args, **kwargs):
"""
Records an up/down counter metric.
Gets a meter.
"""
...

View file

@ -1,20 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from abc import abstractmethod
from fastapi import FastAPI
from pydantic import BaseModel
class TelemetryProvider(BaseModel):
"""
TelemetryProvider standardizes how telemetry is provided to the application.
"""
@abstractmethod
def fastapi_middleware(self, app: FastAPI, *args, **kwargs):
"""
Injects FastAPI middleware that instruments the application for telemetry.
"""
...

View file

@ -0,0 +1,24 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .config import OTelTelemetryConfig
__all__ = ["OTelTelemetryConfig"]
async def get_provider_impl(config: OTelTelemetryConfig, deps):
"""
Get the OTel telemetry provider implementation.
This function is called by the Llama Stack registry to instantiate
the provider.
"""
from .otel import OTelTelemetryProvider
# The provider is synchronously initialized via Pydantic model_post_init
# No async initialization needed
return OTelTelemetryProvider(config=config)

View file

@ -1,4 +1,4 @@
from typing import Literal
from typing import Any, Literal
from pydantic import BaseModel, Field
@ -11,17 +11,19 @@ class OTelTelemetryConfig(BaseModel):
"""
The configuration for the OpenTelemetry telemetry provider.
Most configuration is set using environment variables.
See https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ for more information.
See https://opentelemetry.io/docs/specs/otel/configuration/sdk-configuration-variables/ for more information.
"""
service_name: str = Field(
description="""The name of the service to be monitored.
Is overridden by the OTEL_SERVICE_NAME or OTEL_RESOURCE_ATTRIBUTES environment variables.""",
)
service_version: str | None = Field(
default=None,
description="""The version of the service to be monitored.
Is overriden by the OTEL_RESOURCE_ATTRIBUTES environment variable."""
)
deployment_environment: str | None = Field(
default=None,
description="""The name of the environment of the service to be monitored.
Is overriden by the OTEL_RESOURCE_ATTRIBUTES environment variable."""
)
@ -30,3 +32,13 @@ class OTelTelemetryConfig(BaseModel):
Is overriden by the OTEL_SPAN_PROCESSOR environment variable.""",
default="batch"
)
@classmethod
def sample_run_config(cls, __distro_dir__: str = "") -> dict[str, Any]:
"""Sample configuration for use in distributions."""
return {
"service_name": "${env.OTEL_SERVICE_NAME:=llama-stack}",
"service_version": "${env.OTEL_SERVICE_VERSION:=}",
"deployment_environment": "${env.OTEL_DEPLOYMENT_ENVIRONMENT:=}",
"span_processor": "${env.OTEL_SPAN_PROCESSOR:=batch}",
}

View file

@ -1,22 +1,21 @@
import os
import threading
from opentelemetry import trace, metrics
from opentelemetry.context.context import Context
from opentelemetry.sdk.resources import Attributes, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.metrics import Counter, UpDownCounter, Histogram, ObservableGauge
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.trace import Span, SpanKind, _Links
from typing import Sequence
from pydantic import PrivateAttr
from opentelemetry.trace import Tracer
from opentelemetry.metrics import Meter
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from llama_stack.core.telemetry.tracing import TelemetryProvider
from llama_stack.core.telemetry.telemetry import TelemetryProvider
from llama_stack.log import get_logger
from sqlalchemy import Engine
from .config import OTelTelemetryConfig
from fastapi import FastAPI
@ -29,15 +28,9 @@ class OTelTelemetryProvider(TelemetryProvider):
A simple Open Telemetry native telemetry provider.
"""
config: OTelTelemetryConfig
_counters: dict[str, Counter] = PrivateAttr(default_factory=dict)
_up_down_counters: dict[str, UpDownCounter] = PrivateAttr(default_factory=dict)
_histograms: dict[str, Histogram] = PrivateAttr(default_factory=dict)
_gauges: dict[str, ObservableGauge] = PrivateAttr(default_factory=dict)
def model_post_init(self, __context):
"""Initialize provider after Pydantic validation."""
self._lock = threading.Lock()
attributes: Attributes = {
key: value
@ -74,68 +67,114 @@ class OTelTelemetryProvider(TelemetryProvider):
if not os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"):
logger.warning("OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_METRICS_ENDPOINT is not set. Metrics will not be exported.")
def fastapi_middleware(self, app: FastAPI):
"""
Instrument FastAPI with OTel for automatic tracing and metrics.
Captures:
- Distributed traces for all HTTP requests (via FastAPIInstrumentor)
- HTTP metrics following semantic conventions (custom middleware)
"""
# Enable automatic tracing
FastAPIInstrumentor.instrument_app(app)
def custom_trace(self,
# Add custom middleware for HTTP metrics
meter = self.get_meter("llama_stack.http.server")
# Create HTTP metrics following semantic conventions
# https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
request_duration = meter.create_histogram(
"http.server.request.duration",
unit="ms",
description="Duration of HTTP server requests"
)
active_requests = meter.create_up_down_counter(
"http.server.active_requests",
unit="requests",
description="Number of active HTTP server requests"
)
request_count = meter.create_counter(
"http.server.request.count",
unit="requests",
description="Total number of HTTP server requests"
)
# Add middleware to record metrics
@app.middleware("http") # type: ignore[misc]
async def http_metrics_middleware(request, call_next):
import time
# Record active request
active_requests.add(1, {
"http.method": request.method,
"http.route": request.url.path,
})
start_time = time.time()
status_code = 500 # Default to error
try:
response = await call_next(request)
status_code = response.status_code
except Exception:
raise
finally:
# Record metrics
duration_ms = (time.time() - start_time) * 1000
attributes = {
"http.method": request.method,
"http.route": request.url.path,
"http.status_code": status_code,
}
request_duration.record(duration_ms, attributes)
request_count.add(1, attributes)
active_requests.add(-1, {
"http.method": request.method,
"http.route": request.url.path,
})
return response
def sqlalchemy_instrumentation(self, engine: Engine | None = None):
kwargs = {}
if engine:
kwargs["engine"] = engine
SQLAlchemyInstrumentor().instrument(**kwargs)
def get_tracer(self,
instrumenting_module_name: str,
instrumenting_library_version: str | None = None,
tracer_provider: TracerProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None
) -> Tracer:
return trace.get_tracer(
instrumenting_module_name=instrumenting_module_name,
instrumenting_library_version=instrumenting_library_version,
tracer_provider=tracer_provider,
schema_url=schema_url,
attributes=attributes
)
def get_meter(self,
name: str,
context: Context | None = None,
kind: SpanKind = SpanKind.INTERNAL,
attributes: Attributes = {},
links: _Links = None,
start_time: int | None = None,
record_exception: bool = True,
set_status_on_exception: bool = True) -> Span:
"""
Creates a custom tracing span using the Open Telemetry SDK.
"""
tracer = trace.get_tracer(__name__)
return tracer.start_span(name, context, kind, attributes, links, start_time, record_exception, set_status_on_exception)
def record_count(self, name: str, amount: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""):
"""
Increments a counter metric using the Open Telemetry SDK that are indexed by the meter name.
This function is designed to be compatible with other popular telemetry providers design patterns,
like Datadog and New Relic.
"""
meter = metrics.get_meter(__name__)
with self._lock:
if name not in self._counters:
self._counters[name] = meter.create_counter(name, unit=unit, description=description)
counter = self._counters[name]
counter.add(amount, attributes=attributes, context=context)
def record_histogram(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = "", explicit_bucket_boundaries_advisory: Sequence[float] | None = None):
"""
Records a histogram metric using the Open Telemetry SDK that are indexed by the meter name.
This function is designed to be compatible with other popular telemetry providers design patterns,
like Datadog and New Relic.
"""
meter = metrics.get_meter(__name__)
with self._lock:
if name not in self._histograms:
self._histograms[name] = meter.create_histogram(name, unit=unit, description=description, explicit_bucket_boundaries_advisory=explicit_bucket_boundaries_advisory)
histogram = self._histograms[name]
histogram.record(value, attributes=attributes, context=context)
def record_up_down_counter(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""):
"""
Records an up/down counter metric using the Open Telemetry SDK that are indexed by the meter name.
This function is designed to be compatible with other popular telemetry providers design patterns,
like Datadog and New Relic.
"""
meter = metrics.get_meter(__name__)
with self._lock:
if name not in self._up_down_counters:
self._up_down_counters[name] = meter.create_up_down_counter(name, unit=unit, description=description)
up_down_counter = self._up_down_counters[name]
up_down_counter.add(value, attributes=attributes, context=context)
version: str = "",
meter_provider: MeterProvider | None = None,
schema_url: str | None = None,
attributes: Attributes | None = None
) -> Meter:
return metrics.get_meter(
name=name,
version=version,
meter_provider=meter_provider,
schema_url=schema_url,
attributes=attributes
)

View file

@ -26,4 +26,16 @@ def available_providers() -> list[ProviderSpec]:
config_class="llama_stack.providers.inline.telemetry.meta_reference.config.TelemetryConfig",
description="Meta's reference implementation of telemetry and observability using OpenTelemetry.",
),
InlineProviderSpec(
api=Api.telemetry,
provider_type="inline::otel",
pip_packages=[
"opentelemetry-sdk",
"opentelemetry-exporter-otlp-proto-http",
"opentelemetry-instrumentation-fastapi",
],
module="llama_stack.providers.inline.telemetry.otel",
config_class="llama_stack.providers.inline.telemetry.otel.config.OTelTelemetryConfig",
description="Native OpenTelemetry provider with full access to OTel Tracer and Meter APIs for advanced instrumentation.",
),
]

View file

@ -52,6 +52,7 @@ dependencies = [
"sqlalchemy[asyncio]>=2.0.41", # server - for conversations
"opentelemetry-semantic-conventions>=0.57b0",
"opentelemetry-instrumentation-fastapi>=0.57b0",
"opentelemetry-instrumentation-sqlalchemy>=0.57b0",
]
[project.optional-dependencies]

View file

@ -0,0 +1,6 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,148 @@
# Mock Server Infrastructure
This directory contains mock servers for E2E telemetry testing.
## Structure
```
mocking/
├── README.md ← You are here
├── __init__.py ← Module exports
├── mock_base.py ← Pydantic base class for all mocks
├── servers.py ← Mock server implementations
└── harness.py ← Async startup harness
```
## Files
### `mock_base.py` - Base Class
Pydantic base model that all mock servers must inherit from.
**Contract:**
```python
class MockServerBase(BaseModel):
async def await_start(self):
# Start server and wait until ready
...
def stop(self):
# Stop server and cleanup
...
```
### `servers.py` - Mock Implementations
Contains:
- **MockOTLPCollector** - Receives OTLP telemetry (port 4318)
- **MockVLLMServer** - Simulates vLLM inference API (port 8000)
### `harness.py` - Startup Orchestration
Provides:
- **MockServerConfig** - Pydantic config for server registration
- **start_mock_servers_async()** - Starts servers in parallel
- **stop_mock_servers()** - Stops all servers
## Creating a New Mock Server
### Step 1: Implement the Server
Add to `servers.py`:
```python
class MockRedisServer(MockServerBase):
"""Mock Redis server."""
port: int = Field(default=6379)
# Non-Pydantic fields
server: Any = Field(default=None, exclude=True)
def model_post_init(self, __context):
self.server = None
async def await_start(self):
"""Start Redis mock and wait until ready."""
# Start your server
self.server = create_redis_server(self.port)
self.server.start()
# Wait for port to be listening
for _ in range(10):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if sock.connect_ex(('localhost', self.port)) == 0:
sock.close()
return # Ready!
await asyncio.sleep(0.1)
def stop(self):
if self.server:
self.server.stop()
```
### Step 2: Register in Test
In `test_otel_e2e.py`, add to MOCK_SERVERS list:
```python
MOCK_SERVERS = [
# ... existing servers ...
MockServerConfig(
name="Mock Redis",
server_class=MockRedisServer,
init_kwargs={"port": 6379},
),
]
```
### Step 3: Done!
The harness automatically:
- Creates the server instance
- Calls `await_start()` in parallel with other servers
- Returns when all are ready
- Stops all servers on teardown
## Benefits
**Parallel Startup** - All servers start simultaneously
**Type-Safe** - Pydantic validation
**Simple** - Just implement 2 methods
**Fast** - No HTTP polling, direct port checking
**Clean** - Async/await pattern
## Usage in Tests
```python
@pytest.fixture(scope="module")
def mock_servers():
servers = asyncio.run(start_mock_servers_async(MOCK_SERVERS))
yield servers
stop_mock_servers(servers)
# Access specific servers
@pytest.fixture(scope="module")
def mock_redis(mock_servers):
return mock_servers["Mock Redis"]
```
## Key Design Decisions
### Why Pydantic?
- Type safety for server configuration
- Built-in validation
- Clear interface contract
### Why `await_start()` instead of HTTP `/ready`?
- Faster (no HTTP round-trip)
- Simpler (direct port checking)
- More reliable (internal state, not external endpoint)
### Why separate harness?
- Reusable across different test files
- Easy to add new servers
- Centralized error handling
## Examples
See `test_otel_e2e.py` for real-world usage:
- Line ~200: MOCK_SERVERS configuration
- Line ~230: Convenience fixtures
- Line ~240: Using servers in tests

View file

@ -0,0 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Mock server infrastructure for telemetry E2E testing.
This module provides:
- MockServerBase: Pydantic base class for all mock servers
- MockOTLPCollector: Mock OTLP telemetry collector
- MockVLLMServer: Mock vLLM inference server
- Mock server harness for parallel async startup
"""
from .mock_base import MockServerBase
from .servers import MockOTLPCollector, MockVLLMServer
from .harness import MockServerConfig, start_mock_servers_async, stop_mock_servers
__all__ = [
"MockServerBase",
"MockOTLPCollector",
"MockVLLMServer",
"MockServerConfig",
"start_mock_servers_async",
"stop_mock_servers",
]

View file

@ -0,0 +1,107 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Mock server startup harness for parallel initialization.
HOW TO ADD A NEW MOCK SERVER:
1. Import your mock server class
2. Add it to MOCK_SERVERS list with configuration
3. Done! It will start in parallel with others.
"""
import asyncio
from typing import Any, Dict, List
from pydantic import BaseModel, Field
from .mock_base import MockServerBase
class MockServerConfig(BaseModel):
"""
Configuration for a mock server to start.
**TO ADD A NEW MOCK SERVER:**
Just create a MockServerConfig instance with your server class.
Example:
MockServerConfig(
name="Mock MyService",
server_class=MockMyService,
init_kwargs={"port": 9000, "config_param": "value"},
)
"""
model_config = {"arbitrary_types_allowed": True}
name: str = Field(description="Display name for logging")
server_class: type = Field(description="Mock server class (must inherit from MockServerBase)")
init_kwargs: Dict[str, Any] = Field(default_factory=dict, description="Kwargs to pass to server constructor")
async def start_mock_servers_async(mock_servers_config: List[MockServerConfig]) -> Dict[str, MockServerBase]:
"""
Start all mock servers in parallel and wait for them to be ready.
**HOW IT WORKS:**
1. Creates all server instances
2. Calls await_start() on all servers in parallel
3. Returns when all are ready
**SIMPLE TO USE:**
servers = await start_mock_servers_async([config1, config2, ...])
Args:
mock_servers_config: List of mock server configurations
Returns:
Dict mapping server name to server instance
"""
servers = {}
start_tasks = []
# Create all servers and prepare start tasks
for config in mock_servers_config:
server = config.server_class(**config.init_kwargs)
servers[config.name] = server
start_tasks.append(server.await_start())
# Start all servers in parallel
try:
await asyncio.gather(*start_tasks)
# Print readiness confirmation
for name in servers.keys():
print(f"[INFO] {name} ready")
except Exception as e:
# If any server fails, stop all servers
for server in servers.values():
try:
server.stop()
except:
pass
raise RuntimeError(f"Failed to start mock servers: {e}")
return servers
def stop_mock_servers(servers: Dict[str, Any]):
"""
Stop all mock servers.
Args:
servers: Dict of server instances from start_mock_servers_async()
"""
for name, server in servers.items():
try:
if hasattr(server, 'get_request_count'):
print(f"\n[INFO] {name} received {server.get_request_count()} requests")
server.stop()
except Exception as e:
print(f"[WARN] Error stopping {name}: {e}")

View file

@ -0,0 +1,69 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Base class for mock servers with async startup support.
All mock servers should inherit from MockServerBase and implement await_start().
"""
import asyncio
from abc import abstractmethod
from pydantic import BaseModel, Field
class MockServerBase(BaseModel):
"""
Pydantic base model for mock servers.
**TO CREATE A NEW MOCK SERVER:**
1. Inherit from this class
2. Implement async def await_start(self)
3. Implement def stop(self)
4. Done!
Example:
class MyMockServer(MockServerBase):
port: int = 8080
async def await_start(self):
# Start your server
self.server = create_server()
self.server.start()
# Wait until ready (can check internal state, no HTTP needed)
while not self.server.is_listening():
await asyncio.sleep(0.1)
def stop(self):
if self.server:
self.server.stop()
"""
model_config = {"arbitrary_types_allowed": True}
@abstractmethod
async def await_start(self):
"""
Start the server and wait until it's ready.
This method should:
1. Start the server (synchronous or async)
2. Wait until the server is fully ready to accept requests
3. Return when ready
Subclasses can check internal state directly - no HTTP polling needed!
"""
...
@abstractmethod
def stop(self):
"""
Stop the server and clean up resources.
This method should gracefully shut down the server.
"""
...

View file

@ -0,0 +1,387 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Mock servers for OpenTelemetry E2E testing.
This module provides mock servers for testing telemetry:
- MockOTLPCollector: Receives and stores OTLP telemetry exports
- MockVLLMServer: Simulates vLLM inference API with valid OpenAI responses
These mocks allow E2E testing without external dependencies.
"""
import asyncio
import http.server
import json
import socket
import threading
import time
from typing import Any, Dict, List
from pydantic import Field
from .mock_base import MockServerBase
class MockOTLPCollector(MockServerBase):
"""
Mock OTLP collector HTTP server.
Receives real OTLP exports from Llama Stack and stores them for verification.
Runs on localhost:4318 (standard OTLP HTTP port).
Usage:
collector = MockOTLPCollector()
await collector.await_start()
# ... run tests ...
print(f"Received {collector.get_trace_count()} traces")
collector.stop()
"""
port: int = Field(default=4318, description="Port to run collector on")
# Non-Pydantic fields (set after initialization)
traces: List[Dict] = Field(default_factory=list, exclude=True)
metrics: List[Dict] = Field(default_factory=list, exclude=True)
server: Any = Field(default=None, exclude=True)
server_thread: Any = Field(default=None, exclude=True)
def model_post_init(self, __context):
"""Initialize after Pydantic validation."""
self.traces = []
self.metrics = []
self.server = None
self.server_thread = None
def _create_handler_class(self):
"""Create the HTTP handler class for this collector instance."""
collector_self = self
class OTLPHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for OTLP requests."""
def log_message(self, format, *args):
"""Suppress HTTP server logs."""
pass
def do_GET(self):
"""Handle GET requests."""
# No readiness endpoint needed - using await_start() instead
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle OTLP POST requests."""
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b''
# Store the export request
if '/v1/traces' in self.path:
collector_self.traces.append({
'body': body,
'timestamp': time.time(),
})
elif '/v1/metrics' in self.path:
collector_self.metrics.append({
'body': body,
'timestamp': time.time(),
})
# Always return success (200 OK)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{}')
return OTLPHandler
async def await_start(self):
"""
Start the OTLP collector and wait until ready.
This method is async and can be awaited to ensure the server is ready.
"""
# Create handler and start the HTTP server
handler_class = self._create_handler_class()
self.server = http.server.HTTPServer(('localhost', self.port), handler_class)
self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.server_thread.start()
# Wait for server to be listening on the port
for _ in range(10):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', self.port))
sock.close()
if result == 0:
# Port is listening
return
except:
pass
await asyncio.sleep(0.1)
raise RuntimeError(f"OTLP collector failed to start on port {self.port}")
def stop(self):
"""Stop the OTLP collector server."""
if self.server:
self.server.shutdown()
self.server.server_close()
def clear(self):
"""Clear all captured telemetry data."""
self.traces = []
self.metrics = []
def get_trace_count(self) -> int:
"""Get number of trace export requests received."""
return len(self.traces)
def get_metric_count(self) -> int:
"""Get number of metric export requests received."""
return len(self.metrics)
def get_all_traces(self) -> List[Dict]:
"""Get all captured trace exports."""
return self.traces
def get_all_metrics(self) -> List[Dict]:
"""Get all captured metric exports."""
return self.metrics
class MockVLLMServer(MockServerBase):
"""
Mock vLLM inference server with OpenAI-compatible API.
Returns valid OpenAI Python client response objects for:
- Chat completions (/v1/chat/completions)
- Text completions (/v1/completions)
- Model listing (/v1/models)
Runs on localhost:8000 (standard vLLM port).
Usage:
server = MockVLLMServer(models=["my-model"])
await server.await_start()
# ... make inference calls ...
print(f"Handled {server.get_request_count()} requests")
server.stop()
"""
port: int = Field(default=8000, description="Port to run server on")
models: List[str] = Field(
default_factory=lambda: ["meta-llama/Llama-3.2-1B-Instruct"],
description="List of model IDs to serve"
)
# Non-Pydantic fields
requests_received: List[Dict] = Field(default_factory=list, exclude=True)
server: Any = Field(default=None, exclude=True)
server_thread: Any = Field(default=None, exclude=True)
def model_post_init(self, __context):
"""Initialize after Pydantic validation."""
self.requests_received = []
self.server = None
self.server_thread = None
def _create_handler_class(self):
"""Create the HTTP handler class for this vLLM instance."""
server_self = self
class VLLMHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for vLLM API."""
def log_message(self, format, *args):
"""Suppress HTTP server logs."""
pass
def log_request(self, code='-', size='-'):
"""Log incoming requests for debugging."""
print(f"[DEBUG] Mock vLLM received: {self.command} {self.path} -> {code}")
def do_GET(self):
"""Handle GET requests (models list, health check)."""
# Log GET requests too
server_self.requests_received.append({
'path': self.path,
'method': 'GET',
'timestamp': time.time(),
})
if self.path == '/v1/models':
response = self._create_models_list_response()
self._send_json_response(200, response)
elif self.path == '/health' or self.path == '/v1/health':
self._send_json_response(200, {"status": "healthy"})
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
"""Handle POST requests (chat/text completions)."""
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b'{}'
try:
request_data = json.loads(body)
except:
request_data = {}
# Log the request
server_self.requests_received.append({
'path': self.path,
'body': request_data,
'timestamp': time.time(),
})
# Route to appropriate handler
if '/chat/completions' in self.path:
response = self._create_chat_completion_response(request_data)
self._send_json_response(200, response)
elif '/completions' in self.path:
response = self._create_text_completion_response(request_data)
self._send_json_response(200, response)
else:
self._send_json_response(200, {"status": "ok"})
# ----------------------------------------------------------------
# Response Generators
# **TO MODIFY RESPONSES:** Edit these methods
# ----------------------------------------------------------------
def _create_models_list_response(self) -> Dict:
"""Create OpenAI models list response with configured models."""
return {
"object": "list",
"data": [
{
"id": model_id,
"object": "model",
"created": int(time.time()),
"owned_by": "meta",
}
for model_id in server_self.models
]
}
def _create_chat_completion_response(self, request_data: Dict) -> Dict:
"""
Create OpenAI ChatCompletion response.
Returns a valid response matching openai.types.ChatCompletion
"""
return {
"id": "chatcmpl-test123",
"object": "chat.completion",
"created": int(time.time()),
"model": request_data.get("model", "meta-llama/Llama-3.2-1B-Instruct"),
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "This is a test response from mock vLLM server.",
"tool_calls": None,
},
"logprobs": None,
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": 25,
"completion_tokens": 15,
"total_tokens": 40,
"completion_tokens_details": None,
},
"system_fingerprint": None,
"service_tier": None,
}
def _create_text_completion_response(self, request_data: Dict) -> Dict:
"""
Create OpenAI Completion response.
Returns a valid response matching openai.types.Completion
"""
return {
"id": "cmpl-test123",
"object": "text_completion",
"created": int(time.time()),
"model": request_data.get("model", "meta-llama/Llama-3.2-1B-Instruct"),
"choices": [{
"text": "This is a test completion.",
"index": 0,
"logprobs": None,
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 8,
"total_tokens": 18,
"completion_tokens_details": None,
},
"system_fingerprint": None,
}
def _send_json_response(self, status_code: int, data: Dict):
"""Helper to send JSON response."""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
return VLLMHandler
async def await_start(self):
"""
Start the vLLM server and wait until ready.
This method is async and can be awaited to ensure the server is ready.
"""
# Create handler and start the HTTP server
handler_class = self._create_handler_class()
self.server = http.server.HTTPServer(('localhost', self.port), handler_class)
self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.server_thread.start()
# Wait for server to be listening on the port
for _ in range(10):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', self.port))
sock.close()
if result == 0:
# Port is listening
return
except:
pass
await asyncio.sleep(0.1)
raise RuntimeError(f"vLLM server failed to start on port {self.port}")
def stop(self):
"""Stop the vLLM server."""
if self.server:
self.server.shutdown()
self.server.server_close()
def clear(self):
"""Clear request history."""
self.requests_received = []
def get_request_count(self) -> int:
"""Get number of requests received."""
return len(self.requests_received)
def get_all_requests(self) -> List[Dict]:
"""Get all received requests with their bodies."""
return self.requests_received

View file

@ -0,0 +1,455 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
End-to-end integration tests for OpenTelemetry with automatic instrumentation.
HOW THIS WORKS:
1. Starts a mock OTLP collector (HTTP server) to receive telemetry
2. Starts a mock vLLM server to handle inference requests
3. Starts REAL Llama Stack with: opentelemetry-instrument llama stack run
4. Makes REAL API calls to the stack
5. Verifies telemetry was exported to the mock collector
WHERE TO MAKE CHANGES:
- Add test cases See TEST_CASES list below (line ~70)
- Add mock servers See MOCK_SERVERS list in mock_servers fixture (line ~200)
- Modify mock behavior See mocking/servers.py
- Change stack config See llama_stack_server fixture (line ~250)
- Add assertions See TestOTelE2EWithRealServer class (line ~370)
RUNNING THE TESTS:
- Quick (mock servers only): pytest test_otel_e2e.py::TestMockServers -v
- Full E2E (slow): pytest test_otel_e2e.py::TestOTelE2EWithRealServer -v -m slow
"""
# ============================================================================
# IMPORTS
# ============================================================================
import os
import socket
import subprocess
import time
from typing import Any, Dict, List
import pytest
import requests
import yaml
from pydantic import BaseModel, Field
# Mock servers are in the mocking/ subdirectory
from .mocking import (
MockOTLPCollector,
MockVLLMServer,
MockServerConfig,
start_mock_servers_async,
stop_mock_servers,
)
# ============================================================================
# DATA MODELS
# ============================================================================
class TelemetryTestCase(BaseModel):
"""
Pydantic model defining expected telemetry for an API call.
**TO ADD A NEW TEST CASE:** Add to TEST_CASES list below.
"""
name: str = Field(description="Unique test case identifier")
http_method: str = Field(description="HTTP method (GET, POST, etc.)")
api_path: str = Field(description="API path (e.g., '/v1/models')")
request_body: Dict[str, Any] | None = Field(default=None)
expected_http_status: int = Field(default=200)
expected_trace_exports: int = Field(default=1, description="Minimum number of trace exports expected")
expected_metric_exports: int = Field(default=0, description="Minimum number of metric exports expected")
should_have_error_span: bool = Field(default=False)
# ============================================================================
# TEST CONFIGURATION
# **TO ADD NEW TESTS:** Add TelemetryTestCase instances here
# ============================================================================
TEST_CASES = [
TelemetryTestCase(
name="models_list",
http_method="GET",
api_path="/v1/models",
expected_trace_exports=1,
expected_metric_exports=1, # HTTP metrics from OTel provider middleware
),
TelemetryTestCase(
name="chat_completion",
http_method="POST",
api_path="/v1/inference/chat_completion",
request_body={
"model": "meta-llama/Llama-3.2-1B-Instruct",
"messages": [{"role": "user", "content": "Hello!"}],
},
expected_trace_exports=2, # Stack request + vLLM backend call
expected_metric_exports=1, # HTTP metrics (duration, count, active_requests)
),
]
# ============================================================================
# TEST INFRASTRUCTURE
# ============================================================================
class TelemetryTestRunner:
"""
Executes TelemetryTestCase instances against real Llama Stack.
**HOW IT WORKS:**
1. Makes real HTTP request to the stack
2. Waits for telemetry export
3. Verifies exports were sent to mock collector
"""
def __init__(self, base_url: str, collector: MockOTLPCollector):
self.base_url = base_url
self.collector = collector
def run_test_case(self, test_case: TelemetryTestCase, verbose: bool = False) -> bool:
"""Execute a single test case and verify telemetry."""
initial_traces = self.collector.get_trace_count()
initial_metrics = self.collector.get_metric_count()
if verbose:
print(f"\n--- {test_case.name} ---")
print(f" {test_case.http_method} {test_case.api_path}")
# Make real HTTP request to Llama Stack
try:
url = f"{self.base_url}{test_case.api_path}"
if test_case.http_method == "GET":
response = requests.get(url, timeout=5)
elif test_case.http_method == "POST":
response = requests.post(url, json=test_case.request_body or {}, timeout=5)
else:
response = requests.request(test_case.http_method, url, timeout=5)
if verbose:
print(f" HTTP Response: {response.status_code}")
status_match = response.status_code == test_case.expected_http_status
except requests.exceptions.RequestException as e:
if verbose:
print(f" Request failed: {e}")
status_match = False
# Wait for automatic instrumentation to export telemetry
# Traces export immediately, metrics export every 1 second (configured via env var)
time.sleep(2.0) # Wait for both traces and metrics to export
# Verify traces were exported to mock collector
new_traces = self.collector.get_trace_count() - initial_traces
traces_exported = new_traces >= test_case.expected_trace_exports
# Verify metrics were exported (if expected)
new_metrics = self.collector.get_metric_count() - initial_metrics
metrics_exported = new_metrics >= test_case.expected_metric_exports
if verbose:
print(f" Expected: >={test_case.expected_trace_exports} trace exports, >={test_case.expected_metric_exports} metric exports")
print(f" Actual: {new_traces} trace exports, {new_metrics} metric exports")
result = status_match and traces_exported and metrics_exported
print(f" Result: {'PASS' if result else 'FAIL'}")
return status_match and traces_exported and metrics_exported
def run_all_test_cases(self, test_cases: List[TelemetryTestCase], verbose: bool = True) -> Dict[str, bool]:
"""Run all test cases and return results."""
results = {}
for test_case in test_cases:
results[test_case.name] = self.run_test_case(test_case, verbose=verbose)
return results
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def is_port_available(port: int) -> bool:
"""Check if a TCP port is available for binding."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('localhost', port))
return True
except OSError:
return False
# ============================================================================
# PYTEST FIXTURES
# ============================================================================
@pytest.fixture(scope="module")
def mock_servers():
"""
Fixture: Start all mock servers in parallel using async harness.
**TO ADD A NEW MOCK SERVER:**
Just add a MockServerConfig to the MOCK_SERVERS list below.
"""
import asyncio
# ========================================================================
# MOCK SERVER CONFIGURATION
# **TO ADD A NEW MOCK:** Just add a MockServerConfig instance below
#
# Example:
# MockServerConfig(
# name="Mock MyService",
# server_class=MockMyService, # Must inherit from MockServerBase
# init_kwargs={"port": 9000, "param": "value"},
# ),
# ========================================================================
MOCK_SERVERS = [
MockServerConfig(
name="Mock OTLP Collector",
server_class=MockOTLPCollector,
init_kwargs={"port": 4318},
),
MockServerConfig(
name="Mock vLLM Server",
server_class=MockVLLMServer,
init_kwargs={
"port": 8000,
"models": ["meta-llama/Llama-3.2-1B-Instruct"],
},
),
# Add more mock servers here - they will start in parallel automatically!
]
# Start all servers in parallel
servers = asyncio.run(start_mock_servers_async(MOCK_SERVERS))
# Verify vLLM models
models_response = requests.get("http://localhost:8000/v1/models", timeout=1)
models_data = models_response.json()
print(f"[INFO] Mock vLLM serving {len(models_data['data'])} models: {[m['id'] for m in models_data['data']]}")
yield servers
# Stop all servers
stop_mock_servers(servers)
@pytest.fixture(scope="module")
def mock_otlp_collector(mock_servers):
"""Convenience fixture to get OTLP collector from mock_servers."""
return mock_servers["Mock OTLP Collector"]
@pytest.fixture(scope="module")
def mock_vllm_server(mock_servers):
"""Convenience fixture to get vLLM server from mock_servers."""
return mock_servers["Mock vLLM Server"]
@pytest.fixture(scope="module")
def llama_stack_server(tmp_path_factory, mock_otlp_collector, mock_vllm_server):
"""
Fixture: Start real Llama Stack server with automatic OTel instrumentation.
**THIS IS THE MAIN FIXTURE** - it runs:
opentelemetry-instrument llama stack run --config run.yaml
**TO MODIFY STACK CONFIG:** Edit run_config dict below
"""
config_dir = tmp_path_factory.mktemp("otel-stack-config")
# Ensure mock vLLM is ready and accessible before starting Llama Stack
print(f"\n[INFO] Verifying mock vLLM is accessible at http://localhost:8000...")
try:
vllm_models = requests.get("http://localhost:8000/v1/models", timeout=2)
print(f"[INFO] Mock vLLM models endpoint response: {vllm_models.status_code}")
except Exception as e:
pytest.fail(f"Mock vLLM not accessible before starting Llama Stack: {e}")
# Create run.yaml with inference provider
# **TO ADD MORE PROVIDERS:** Add to providers dict
run_config = {
"image_name": "test-otel-e2e",
"apis": ["inference"],
"providers": {
"inference": [
{
"provider_id": "vllm",
"provider_type": "remote::vllm",
"config": {
"url": "http://localhost:8000/v1",
},
},
],
},
"models": [
{
"model_id": "meta-llama/Llama-3.2-1B-Instruct",
"provider_id": "vllm",
}
],
}
config_file = config_dir / "run.yaml"
with open(config_file, "w") as f:
yaml.dump(run_config, f)
# Find available port for Llama Stack
port = 5555
while not is_port_available(port) and port < 5600:
port += 1
if port >= 5600:
pytest.skip("No available ports for test server")
# Set environment variables for OTel instrumentation
# NOTE: These only affect the subprocess, not other tests
env = os.environ.copy()
env["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4318"
env["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" # Ensure correct protocol
env["OTEL_SERVICE_NAME"] = "llama-stack-e2e-test"
env["LLAMA_STACK_PORT"] = str(port)
env["OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"] = "true"
# Configure fast metric export for testing (default is 60 seconds)
# This makes metrics export every 500ms instead of every 60 seconds
env["OTEL_METRIC_EXPORT_INTERVAL"] = "500" # milliseconds
env["OTEL_METRIC_EXPORT_TIMEOUT"] = "1000" # milliseconds
# Disable inference recording to ensure real requests to our mock vLLM
# This is critical - without this, Llama Stack replays cached responses
# Safe to remove here as it only affects the subprocess environment
if "LLAMA_STACK_TEST_INFERENCE_MODE" in env:
del env["LLAMA_STACK_TEST_INFERENCE_MODE"]
# Start server with automatic instrumentation
cmd = [
"opentelemetry-instrument", # ← Automatic instrumentation wrapper
"llama", "stack", "run",
str(config_file),
"--port", str(port),
]
print(f"\n[INFO] Starting Llama Stack with OTel instrumentation on port {port}")
print(f"[INFO] Command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait for server to start
max_wait = 30
base_url = f"http://localhost:{port}"
for i in range(max_wait):
try:
response = requests.get(f"{base_url}/v1/health", timeout=1)
if response.status_code == 200:
print(f"[INFO] Server ready at {base_url}")
break
except requests.exceptions.RequestException:
if i == max_wait - 1:
process.terminate()
stdout, stderr = process.communicate(timeout=5)
pytest.fail(f"Server failed to start.\nStdout: {stdout}\nStderr: {stderr}")
time.sleep(1)
yield {
'base_url': base_url,
'port': port,
'collector': mock_otlp_collector,
'vllm_server': mock_vllm_server,
}
# Cleanup
print(f"\n[INFO] Stopping Llama Stack server")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
# ============================================================================
# TESTS: End-to-End with Real Stack
# **THESE RUN SLOW** - marked with @pytest.mark.slow
# **TO ADD NEW E2E TESTS:** Add methods to this class
# ============================================================================
@pytest.mark.slow
class TestOTelE2E:
"""
End-to-end tests with real Llama Stack server.
These tests verify the complete flow:
- Real Llama Stack with opentelemetry-instrument
- Real API calls
- Real automatic instrumentation
- Mock OTLP collector captures exports
"""
def test_server_starts_with_auto_instrumentation(self, llama_stack_server):
"""Verify server starts successfully with opentelemetry-instrument."""
base_url = llama_stack_server['base_url']
# Try different health check endpoints
health_endpoints = ["/health", "/v1/health", "/"]
server_responding = False
for endpoint in health_endpoints:
try:
response = requests.get(f"{base_url}{endpoint}", timeout=5)
print(f"\n[DEBUG] {endpoint} -> {response.status_code}")
if response.status_code == 200:
server_responding = True
break
except Exception as e:
print(f"[DEBUG] {endpoint} failed: {e}")
assert server_responding, f"Server not responding on any endpoint at {base_url}"
print(f"\n[PASS] Llama Stack running with OTel at {base_url}")
def test_all_test_cases_via_runner(self, llama_stack_server):
"""
**MAIN TEST:** Run all TelemetryTestCase instances.
This executes all test cases defined in TEST_CASES list.
**TO ADD MORE TESTS:** Add to TEST_CASES at top of file
"""
base_url = llama_stack_server['base_url']
collector = llama_stack_server['collector']
# Create test runner
runner = TelemetryTestRunner(base_url, collector)
# Execute all test cases
results = runner.run_all_test_cases(TEST_CASES, verbose=True)
# Print summary
print(f"\n{'='*50}")
print(f"TEST CASE SUMMARY")
print(f"{'='*50}")
passed = sum(1 for p in results.values() if p)
total = len(results)
print(f"Passed: {passed}/{total}\n")
for name, result in results.items():
status = "[PASS]" if result else "[FAIL]"
print(f" {status} {name}")
print(f"{'='*50}\n")

View file

@ -1,532 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Integration tests for OpenTelemetry provider.
These tests verify that the OTel provider correctly:
- Initializes within the Llama Stack
- Captures expected metrics (counters, histograms, up/down counters)
- Captures expected spans/traces
- Exports telemetry data to an OTLP collector (in-memory for testing)
Tests use in-memory exporters to avoid external dependencies and can run in GitHub Actions.
"""
import os
import time
from collections import defaultdict
from unittest.mock import patch
import pytest
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig
from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider
@pytest.fixture(scope="module")
def in_memory_span_exporter():
"""Create an in-memory span exporter to capture traces."""
return InMemorySpanExporter()
@pytest.fixture(scope="module")
def in_memory_metric_reader():
"""Create an in-memory metric reader to capture metrics."""
return InMemoryMetricReader()
@pytest.fixture(scope="module")
def otel_provider_with_memory_exporters(in_memory_span_exporter, in_memory_metric_reader):
"""
Create an OTelTelemetryProvider configured with in-memory exporters.
This allows us to capture and verify telemetry data without external services.
Returns a dict with 'provider', 'span_exporter', and 'metric_reader'.
"""
# Set mock environment to avoid warnings
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4318"
config = OTelTelemetryConfig(
service_name="test-llama-stack-otel",
service_version="1.0.0-test",
deployment_environment="ci-test",
span_processor="simple",
)
# Patch the provider to use in-memory exporters
with patch.object(
OTelTelemetryProvider,
'model_post_init',
lambda self, _: _init_with_memory_exporters(
self, config, in_memory_span_exporter, in_memory_metric_reader
)
):
provider = OTelTelemetryProvider(config=config)
yield {
'provider': provider,
'span_exporter': in_memory_span_exporter,
'metric_reader': in_memory_metric_reader
}
def _init_with_memory_exporters(provider, config, span_exporter, metric_reader):
"""Helper to initialize provider with in-memory exporters."""
import threading
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Attributes, Resource
from opentelemetry.sdk.trace import TracerProvider
# Initialize pydantic private attributes
if provider.__pydantic_private__ is None:
provider.__pydantic_private__ = {}
provider._lock = threading.Lock()
provider._counters = {}
provider._up_down_counters = {}
provider._histograms = {}
provider._gauges = {}
# Create resource attributes
attributes: Attributes = {
key: value
for key, value in {
"service.name": config.service_name,
"service.version": config.service_version,
"deployment.environment": config.deployment_environment,
}.items()
if value is not None
}
resource = Resource.create(attributes)
# Configure tracer provider with in-memory exporter
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
trace.set_tracer_provider(tracer_provider)
# Configure meter provider with in-memory reader
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(meter_provider)
class TestOTelProviderInitialization:
"""Test OTel provider initialization within Llama Stack."""
def test_provider_initializes_successfully(self, otel_provider_with_memory_exporters):
"""Test that the OTel provider initializes without errors."""
provider = otel_provider_with_memory_exporters['provider']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
assert provider is not None
assert provider.config.service_name == "test-llama-stack-otel"
assert provider.config.service_version == "1.0.0-test"
assert provider.config.deployment_environment == "ci-test"
def test_provider_has_thread_safety_mechanisms(self, otel_provider_with_memory_exporters):
"""Test that the provider has thread-safety mechanisms in place."""
provider = otel_provider_with_memory_exporters['provider']
assert hasattr(provider, "_lock")
assert provider._lock is not None
assert hasattr(provider, "_counters")
assert hasattr(provider, "_histograms")
assert hasattr(provider, "_up_down_counters")
class TestOTelMetricsCapture:
"""Test that OTel provider captures expected metrics."""
def test_counter_metric_is_captured(self, otel_provider_with_memory_exporters):
"""Test that counter metrics are captured."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record counter metrics
provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"})
provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"})
provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/embeddings"})
# Force metric collection - collect() triggers the reader to gather metrics
metric_reader.collect()
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Verify metrics were captured
assert metrics_data is not None
assert len(metrics_data.resource_metrics) > 0
# Find our counter metric
found_counter = False
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
if metric.name == "llama.requests.total":
found_counter = True
# Verify it's a counter with data points
assert hasattr(metric.data, "data_points")
assert len(metric.data.data_points) > 0
assert found_counter, "Counter metric 'llama.requests.total' was not captured"
def test_histogram_metric_is_captured(self, otel_provider_with_memory_exporters):
"""Test that histogram metrics are captured."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record histogram metrics with various values
latencies = [10.5, 25.3, 50.1, 100.7, 250.2]
for latency in latencies:
provider.record_histogram(
"llama.inference.latency",
latency,
attributes={"model": "llama-3.2"}
)
# Force metric collection
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Find our histogram metric
found_histogram = False
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
if metric.name == "llama.inference.latency":
found_histogram = True
# Verify it's a histogram
assert hasattr(metric.data, "data_points")
data_point = metric.data.data_points[0]
# Histograms should have count and sum
assert hasattr(data_point, "count")
assert data_point.count == len(latencies)
assert found_histogram, "Histogram metric 'llama.inference.latency' was not captured"
def test_up_down_counter_metric_is_captured(self, otel_provider_with_memory_exporters):
"""Test that up/down counter metrics are captured."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record up/down counter metrics
provider.record_up_down_counter("llama.active.sessions", 5)
provider.record_up_down_counter("llama.active.sessions", 3)
provider.record_up_down_counter("llama.active.sessions", -2)
# Force metric collection
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Find our up/down counter metric
found_updown = False
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
if metric.name == "llama.active.sessions":
found_updown = True
assert hasattr(metric.data, "data_points")
assert len(metric.data.data_points) > 0
assert found_updown, "Up/Down counter metric 'llama.active.sessions' was not captured"
def test_metrics_with_attributes_are_captured(self, otel_provider_with_memory_exporters):
"""Test that metric attributes/labels are preserved."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record metrics with different attributes
provider.record_count("llama.tokens.generated", 150.0, attributes={
"model": "llama-3.2-1b",
"user": "test-user"
})
# Force metric collection
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Verify attributes are preserved
found_with_attributes = False
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
if metric.name == "llama.tokens.generated":
data_point = metric.data.data_points[0]
# Check attributes - they're already a dict in the SDK
attrs = data_point.attributes if isinstance(data_point.attributes, dict) else {}
if "model" in attrs and "user" in attrs:
found_with_attributes = True
assert attrs["model"] == "llama-3.2-1b"
assert attrs["user"] == "test-user"
assert found_with_attributes, "Metrics with attributes were not properly captured"
def test_multiple_metric_types_coexist(self, otel_provider_with_memory_exporters):
"""Test that different metric types can coexist."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record various metric types
provider.record_count("test.counter", 1.0)
provider.record_histogram("test.histogram", 42.0)
provider.record_up_down_counter("test.gauge", 10)
# Force metric collection
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Count unique metrics
metric_names = set()
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
metric_names.add(metric.name)
# Should have all three metrics
assert "test.counter" in metric_names
assert "test.histogram" in metric_names
assert "test.gauge" in metric_names
class TestOTelSpansCapture:
"""Test that OTel provider captures expected spans/traces."""
def test_basic_span_is_captured(self, otel_provider_with_memory_exporters):
"""Test that basic spans are captured."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Create a span
span = provider.custom_trace("llama.inference.request")
span.end()
# Get captured spans
spans = span_exporter.get_finished_spans()
assert len(spans) > 0
assert any(span.name == "llama.inference.request" for span in spans)
def test_span_with_attributes_is_captured(self, otel_provider_with_memory_exporters):
"""Test that span attributes are preserved."""
provider = otel_provider_with_memory_exporters['provider']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Create a span with attributes
span = provider.custom_trace(
"llama.chat.completion",
attributes={
"model.id": "llama-3.2-1b",
"user.id": "test-user-123",
"request.id": "req-abc-123"
}
)
span.end()
# Get captured spans
spans = span_exporter.get_finished_spans()
# Find our span
our_span = None
for s in spans:
if s.name == "llama.chat.completion":
our_span = s
break
assert our_span is not None, "Span 'llama.chat.completion' was not captured"
# Verify attributes
attrs = dict(our_span.attributes)
assert attrs.get("model.id") == "llama-3.2-1b"
assert attrs.get("user.id") == "test-user-123"
assert attrs.get("request.id") == "req-abc-123"
def test_multiple_spans_are_captured(self, otel_provider_with_memory_exporters):
"""Test that multiple spans are captured."""
provider = otel_provider_with_memory_exporters['provider']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Create multiple spans
span_names = [
"llama.request.validate",
"llama.model.load",
"llama.inference.execute",
"llama.response.format"
]
for name in span_names:
span = provider.custom_trace(name)
time.sleep(0.01) # Small delay to ensure ordering
span.end()
# Get captured spans
spans = span_exporter.get_finished_spans()
captured_names = {span.name for span in spans}
# Verify all spans were captured
for expected_name in span_names:
assert expected_name in captured_names, f"Span '{expected_name}' was not captured"
def test_span_has_service_metadata(self, otel_provider_with_memory_exporters):
"""Test that spans include service metadata."""
provider = otel_provider_with_memory_exporters['provider']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Create a span
span = provider.custom_trace("test.span")
span.end()
# Get captured spans
spans = span_exporter.get_finished_spans()
assert len(spans) > 0
# Check resource attributes
span = spans[0]
resource_attrs = dict(span.resource.attributes)
assert resource_attrs.get("service.name") == "test-llama-stack-otel"
assert resource_attrs.get("service.version") == "1.0.0-test"
assert resource_attrs.get("deployment.environment") == "ci-test"
class TestOTelDataExport:
"""Test that telemetry data can be exported to OTLP collector."""
def test_metrics_are_exportable(self, otel_provider_with_memory_exporters):
"""Test that metrics can be exported."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
# Record metrics
provider.record_count("export.test.counter", 5.0)
provider.record_histogram("export.test.histogram", 123.45)
# Force export
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
# Verify data structure is exportable
assert metrics_data is not None
assert hasattr(metrics_data, "resource_metrics")
assert len(metrics_data.resource_metrics) > 0
# Verify resource attributes are present (needed for OTLP export)
resource = metrics_data.resource_metrics[0].resource
assert resource is not None
assert len(resource.attributes) > 0
def test_spans_are_exportable(self, otel_provider_with_memory_exporters):
"""Test that spans can be exported."""
provider = otel_provider_with_memory_exporters['provider']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Create spans
span1 = provider.custom_trace("export.test.span1")
span1.end()
span2 = provider.custom_trace("export.test.span2")
span2.end()
# Get exported spans
spans = span_exporter.get_finished_spans()
# Verify spans have required OTLP fields
assert len(spans) >= 2
for span in spans:
assert span.name is not None
assert span.context is not None
assert span.context.trace_id is not None
assert span.context.span_id is not None
assert span.resource is not None
def test_concurrent_export_is_safe(self, otel_provider_with_memory_exporters):
"""Test that concurrent metric/span recording doesn't break export."""
import concurrent.futures
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
def record_data(thread_id):
for i in range(10):
provider.record_count(f"concurrent.counter.{thread_id}", 1.0)
span = provider.custom_trace(f"concurrent.span.{thread_id}.{i}")
span.end()
# Record from multiple threads
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(record_data, i) for i in range(5)]
concurrent.futures.wait(futures)
# Verify export still works
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
spans = span_exporter.get_finished_spans()
assert metrics_data is not None
assert len(spans) >= 50 # 5 threads * 10 spans each
@pytest.mark.integration
class TestOTelProviderIntegration:
"""End-to-end integration tests simulating real usage."""
def test_complete_inference_workflow_telemetry(self, otel_provider_with_memory_exporters):
"""Simulate a complete inference workflow with telemetry."""
provider = otel_provider_with_memory_exporters['provider']
metric_reader = otel_provider_with_memory_exporters['metric_reader']
span_exporter = otel_provider_with_memory_exporters['span_exporter']
# Simulate inference workflow
request_span = provider.custom_trace(
"llama.inference.request",
attributes={"model": "llama-3.2-1b", "user": "test"}
)
# Track metrics during inference
provider.record_count("llama.requests.received", 1.0)
provider.record_up_down_counter("llama.requests.in_flight", 1)
# Simulate processing time
time.sleep(0.01)
provider.record_histogram("llama.request.duration_ms", 10.5)
# Track tokens
provider.record_count("llama.tokens.input", 25.0)
provider.record_count("llama.tokens.output", 150.0)
# End request
provider.record_up_down_counter("llama.requests.in_flight", -1)
provider.record_count("llama.requests.completed", 1.0)
request_span.end()
# Verify all telemetry was captured
metric_reader.collect()
metrics_data = metric_reader.get_metrics_data()
spans = span_exporter.get_finished_spans()
# Check metrics exist
metric_names = set()
for rm in metrics_data.resource_metrics:
for sm in rm.scope_metrics:
for m in sm.metrics:
metric_names.add(m.name)
assert "llama.requests.received" in metric_names
assert "llama.requests.in_flight" in metric_names
assert "llama.request.duration_ms" in metric_names
assert "llama.tokens.input" in metric_names
assert "llama.tokens.output" in metric_names
# Check span exists
assert any(s.name == "llama.inference.request" for s in spans)

View file

@ -5,10 +5,12 @@
# the root directory of this source tree.
import concurrent.futures
import threading
from unittest.mock import MagicMock
import pytest
from opentelemetry import trace
from opentelemetry.metrics import Meter
from opentelemetry.trace import Tracer
from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig
from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider
@ -43,12 +45,6 @@ class TestOTelTelemetryProviderInitialization:
provider = OTelTelemetryProvider(config=otel_config)
assert provider.config == otel_config
assert hasattr(provider, "_lock")
assert provider._lock is not None
assert isinstance(provider._counters, dict)
assert isinstance(provider._histograms, dict)
assert isinstance(provider._up_down_counters, dict)
assert isinstance(provider._gauges, dict)
def test_initialization_sets_service_attributes(self, otel_config, monkeypatch):
"""Test that service attributes are properly configured."""
@ -88,228 +84,309 @@ class TestOTelTelemetryProviderInitialization:
assert any("Metrics will not be exported" in record.message for record in caplog.records)
class TestOTelTelemetryProviderMetrics:
"""Tests for metric recording functionality."""
class TestOTelTelemetryProviderTracerAPI:
"""Tests for the get_tracer() API."""
def test_record_count_creates_counter(self, otel_provider):
"""Test that record_count creates a counter on first call."""
assert "test_counter" not in otel_provider._counters
def test_get_tracer_returns_tracer(self, otel_provider):
"""Test that get_tracer returns a valid Tracer instance."""
tracer = otel_provider.get_tracer("test.module")
otel_provider.record_count("test_counter", 1.0)
assert tracer is not None
assert isinstance(tracer, Tracer)
assert "test_counter" in otel_provider._counters
assert otel_provider._counters["test_counter"] is not None
def test_record_count_reuses_counter(self, otel_provider):
"""Test that record_count reuses existing counter."""
otel_provider.record_count("test_counter", 1.0)
first_counter = otel_provider._counters["test_counter"]
otel_provider.record_count("test_counter", 2.0)
second_counter = otel_provider._counters["test_counter"]
assert first_counter is second_counter
assert len(otel_provider._counters) == 1
def test_record_count_with_attributes(self, otel_provider):
"""Test that record_count works with attributes."""
otel_provider.record_count(
"test_counter",
1.0,
attributes={"key": "value", "env": "test"}
def test_get_tracer_with_version(self, otel_provider):
"""Test that get_tracer works with version parameter."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module",
instrumenting_library_version="1.0.0"
)
assert "test_counter" in otel_provider._counters
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_record_histogram_creates_histogram(self, otel_provider):
"""Test that record_histogram creates a histogram on first call."""
assert "test_histogram" not in otel_provider._histograms
otel_provider.record_histogram("test_histogram", 42.5)
assert "test_histogram" in otel_provider._histograms
assert otel_provider._histograms["test_histogram"] is not None
def test_record_histogram_reuses_histogram(self, otel_provider):
"""Test that record_histogram reuses existing histogram."""
otel_provider.record_histogram("test_histogram", 10.0)
first_histogram = otel_provider._histograms["test_histogram"]
otel_provider.record_histogram("test_histogram", 20.0)
second_histogram = otel_provider._histograms["test_histogram"]
assert first_histogram is second_histogram
assert len(otel_provider._histograms) == 1
def test_record_histogram_with_bucket_boundaries(self, otel_provider):
"""Test that record_histogram works with explicit bucket boundaries."""
boundaries = [0.0, 10.0, 50.0, 100.0]
otel_provider.record_histogram(
"test_histogram",
25.0,
explicit_bucket_boundaries_advisory=boundaries
def test_get_tracer_with_attributes(self, otel_provider):
"""Test that get_tracer works with attributes."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module",
attributes={"component": "test", "tier": "backend"}
)
assert "test_histogram" in otel_provider._histograms
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_record_up_down_counter_creates_counter(self, otel_provider):
"""Test that record_up_down_counter creates a counter on first call."""
assert "test_updown" not in otel_provider._up_down_counters
def test_get_tracer_with_schema_url(self, otel_provider):
"""Test that get_tracer works with schema URL."""
tracer = otel_provider.get_tracer(
instrumenting_module_name="test.module",
schema_url="https://example.com/schema"
)
otel_provider.record_up_down_counter("test_updown", 1.0)
assert tracer is not None
assert isinstance(tracer, Tracer)
assert "test_updown" in otel_provider._up_down_counters
assert otel_provider._up_down_counters["test_updown"] is not None
def test_record_up_down_counter_reuses_counter(self, otel_provider):
"""Test that record_up_down_counter reuses existing counter."""
otel_provider.record_up_down_counter("test_updown", 5.0)
first_counter = otel_provider._up_down_counters["test_updown"]
otel_provider.record_up_down_counter("test_updown", -3.0)
second_counter = otel_provider._up_down_counters["test_updown"]
assert first_counter is second_counter
assert len(otel_provider._up_down_counters) == 1
def test_multiple_metrics_with_different_names(self, otel_provider):
"""Test that multiple metrics with different names are cached separately."""
otel_provider.record_count("counter1", 1.0)
otel_provider.record_count("counter2", 2.0)
otel_provider.record_histogram("histogram1", 10.0)
otel_provider.record_up_down_counter("updown1", 5.0)
assert len(otel_provider._counters) == 2
assert len(otel_provider._histograms) == 1
assert len(otel_provider._up_down_counters) == 1
class TestOTelTelemetryProviderThreadSafety:
"""Tests for thread safety of metric operations."""
def test_concurrent_counter_creation_same_name(self, otel_provider):
"""Test that concurrent calls to record_count with same name are thread-safe."""
num_threads = 50
counter_name = "concurrent_counter"
def record_metric():
otel_provider.record_count(counter_name, 1.0)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(record_metric) for _ in range(num_threads)]
concurrent.futures.wait(futures)
# Should have exactly one counter created despite concurrent access
assert len(otel_provider._counters) == 1
assert counter_name in otel_provider._counters
def test_concurrent_histogram_creation_same_name(self, otel_provider):
"""Test that concurrent calls to record_histogram with same name are thread-safe."""
num_threads = 50
histogram_name = "concurrent_histogram"
def record_metric():
thread_id = threading.current_thread().ident or 0
otel_provider.record_histogram(histogram_name, float(thread_id % 100))
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(record_metric) for _ in range(num_threads)]
concurrent.futures.wait(futures)
# Should have exactly one histogram created despite concurrent access
assert len(otel_provider._histograms) == 1
assert histogram_name in otel_provider._histograms
def test_concurrent_up_down_counter_creation_same_name(self, otel_provider):
"""Test that concurrent calls to record_up_down_counter with same name are thread-safe."""
num_threads = 50
counter_name = "concurrent_updown"
def record_metric():
otel_provider.record_up_down_counter(counter_name, 1.0)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(record_metric) for _ in range(num_threads)]
concurrent.futures.wait(futures)
# Should have exactly one counter created despite concurrent access
assert len(otel_provider._up_down_counters) == 1
assert counter_name in otel_provider._up_down_counters
def test_concurrent_mixed_metrics_different_names(self, otel_provider):
"""Test concurrent creation of different metric types with different names."""
num_threads = 30
def record_counters(thread_id):
otel_provider.record_count(f"counter_{thread_id}", 1.0)
def record_histograms(thread_id):
otel_provider.record_histogram(f"histogram_{thread_id}", float(thread_id))
def record_up_down_counters(thread_id):
otel_provider.record_up_down_counter(f"updown_{thread_id}", float(thread_id))
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads * 3) as executor:
futures = []
for i in range(num_threads):
futures.append(executor.submit(record_counters, i))
futures.append(executor.submit(record_histograms, i))
futures.append(executor.submit(record_up_down_counters, i))
concurrent.futures.wait(futures)
# Each thread should have created its own metric
assert len(otel_provider._counters) == num_threads
assert len(otel_provider._histograms) == num_threads
assert len(otel_provider._up_down_counters) == num_threads
def test_concurrent_access_existing_and_new_metrics(self, otel_provider):
"""Test concurrent access mixing existing and new metric creation."""
# Pre-create some metrics
otel_provider.record_count("existing_counter", 1.0)
otel_provider.record_histogram("existing_histogram", 10.0)
num_threads = 40
def mixed_operations(thread_id):
# Half the threads use existing metrics, half create new ones
if thread_id % 2 == 0:
otel_provider.record_count("existing_counter", 1.0)
otel_provider.record_histogram("existing_histogram", float(thread_id))
else:
otel_provider.record_count(f"new_counter_{thread_id}", 1.0)
otel_provider.record_histogram(f"new_histogram_{thread_id}", float(thread_id))
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(mixed_operations, i) for i in range(num_threads)]
concurrent.futures.wait(futures)
# Should have existing metrics plus half of num_threads new ones
expected_new_counters = num_threads // 2
expected_new_histograms = num_threads // 2
assert len(otel_provider._counters) == 1 + expected_new_counters
assert len(otel_provider._histograms) == 1 + expected_new_histograms
class TestOTelTelemetryProviderTracing:
"""Tests for tracing functionality."""
def test_custom_trace_creates_span(self, otel_provider):
"""Test that custom_trace creates a span."""
span = otel_provider.custom_trace("test_span")
def test_tracer_can_create_spans(self, otel_provider):
"""Test that tracer can create spans."""
tracer = otel_provider.get_tracer("test.module")
with tracer.start_as_current_span("test.operation") as span:
assert span is not None
assert hasattr(span, "get_span_context")
assert span.is_recording()
def test_custom_trace_with_attributes(self, otel_provider):
"""Test that custom_trace works with attributes."""
attributes = {"key": "value", "operation": "test"}
span = otel_provider.custom_trace("test_span", attributes=attributes)
def test_tracer_can_create_spans_with_attributes(self, otel_provider):
"""Test that tracer can create spans with attributes."""
tracer = otel_provider.get_tracer("test.module")
with tracer.start_as_current_span(
"test.operation",
attributes={"user.id": "123", "request.id": "abc"}
) as span:
assert span is not None
assert span.is_recording()
def test_multiple_tracers_can_coexist(self, otel_provider):
"""Test that multiple tracers can be created."""
tracer1 = otel_provider.get_tracer("module.one")
tracer2 = otel_provider.get_tracer("module.two")
assert tracer1 is not None
assert tracer2 is not None
# Tracers with different names might be the same instance or different
# depending on OTel implementation, so just verify both work
with tracer1.start_as_current_span("op1") as span1:
assert span1.is_recording()
with tracer2.start_as_current_span("op2") as span2:
assert span2.is_recording()
class TestOTelTelemetryProviderMeterAPI:
"""Tests for the get_meter() API."""
def test_get_meter_returns_meter(self, otel_provider):
"""Test that get_meter returns a valid Meter instance."""
meter = otel_provider.get_meter("test.meter")
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_version(self, otel_provider):
"""Test that get_meter works with version parameter."""
meter = otel_provider.get_meter(
name="test.meter",
version="1.0.0"
)
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_attributes(self, otel_provider):
"""Test that get_meter works with attributes."""
meter = otel_provider.get_meter(
name="test.meter",
attributes={"service": "test", "env": "dev"}
)
assert meter is not None
assert isinstance(meter, Meter)
def test_get_meter_with_schema_url(self, otel_provider):
"""Test that get_meter works with schema URL."""
meter = otel_provider.get_meter(
name="test.meter",
schema_url="https://example.com/schema"
)
assert meter is not None
assert isinstance(meter, Meter)
def test_meter_can_create_counter(self, otel_provider):
"""Test that meter can create counters."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter(
"test.requests.total",
unit="requests",
description="Total requests"
)
assert counter is not None
# Test that counter can be used
counter.add(1, {"endpoint": "/test"})
def test_meter_can_create_histogram(self, otel_provider):
"""Test that meter can create histograms."""
meter = otel_provider.get_meter("test.meter")
histogram = meter.create_histogram(
"test.request.duration",
unit="ms",
description="Request duration"
)
assert histogram is not None
# Test that histogram can be used
histogram.record(42.5, {"method": "GET"})
def test_meter_can_create_up_down_counter(self, otel_provider):
"""Test that meter can create up/down counters."""
meter = otel_provider.get_meter("test.meter")
up_down_counter = meter.create_up_down_counter(
"test.active.connections",
unit="connections",
description="Active connections"
)
assert up_down_counter is not None
# Test that up/down counter can be used
up_down_counter.add(5)
up_down_counter.add(-2)
def test_meter_can_create_observable_gauge(self, otel_provider):
"""Test that meter can create observable gauges."""
meter = otel_provider.get_meter("test.meter")
def gauge_callback(options):
return [{"attributes": {"host": "localhost"}, "value": 42.0}]
gauge = meter.create_observable_gauge(
"test.memory.usage",
callbacks=[gauge_callback],
unit="bytes",
description="Memory usage"
)
assert gauge is not None
def test_multiple_instruments_from_same_meter(self, otel_provider):
"""Test that a meter can create multiple instruments."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
histogram = meter.create_histogram("test.histogram")
up_down_counter = meter.create_up_down_counter("test.gauge")
assert counter is not None
assert histogram is not None
assert up_down_counter is not None
# Verify they all work
counter.add(1)
histogram.record(10.0)
up_down_counter.add(5)
class TestOTelTelemetryProviderNativeUsage:
"""Tests for native OpenTelemetry usage patterns."""
def test_complete_tracing_workflow(self, otel_provider):
"""Test a complete tracing workflow using native OTel API."""
tracer = otel_provider.get_tracer("llama_stack.inference")
# Create parent span
with tracer.start_as_current_span("inference.request") as parent_span:
parent_span.set_attribute("model", "llama-3.2-1b")
parent_span.set_attribute("user", "test-user")
# Create child span
with tracer.start_as_current_span("model.load") as child_span:
child_span.set_attribute("model.size", "1B")
assert child_span.is_recording()
# Create another child span
with tracer.start_as_current_span("inference.execute") as child_span:
child_span.set_attribute("tokens.input", 25)
child_span.set_attribute("tokens.output", 150)
assert child_span.is_recording()
assert parent_span.is_recording()
def test_complete_metrics_workflow(self, otel_provider):
"""Test a complete metrics workflow using native OTel API."""
meter = otel_provider.get_meter("llama_stack.metrics")
# Create various instruments
request_counter = meter.create_counter(
"llama.requests.total",
unit="requests",
description="Total requests"
)
latency_histogram = meter.create_histogram(
"llama.inference.duration",
unit="ms",
description="Inference duration"
)
active_sessions = meter.create_up_down_counter(
"llama.sessions.active",
unit="sessions",
description="Active sessions"
)
# Use the instruments
request_counter.add(1, {"endpoint": "/chat", "status": "success"})
latency_histogram.record(123.45, {"model": "llama-3.2-1b"})
active_sessions.add(1)
active_sessions.add(-1)
# No exceptions means success
def test_concurrent_tracer_usage(self, otel_provider):
"""Test that multiple threads can use tracers concurrently."""
def create_spans(thread_id):
tracer = otel_provider.get_tracer(f"test.module.{thread_id}")
for i in range(10):
with tracer.start_as_current_span(f"operation.{i}") as span:
span.set_attribute("thread.id", thread_id)
span.set_attribute("iteration", i)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_spans, i) for i in range(10)]
concurrent.futures.wait(futures)
# If we get here without exceptions, thread safety is working
def test_concurrent_meter_usage(self, otel_provider):
"""Test that multiple threads can use meters concurrently."""
def record_metrics(thread_id):
meter = otel_provider.get_meter(f"test.meter.{thread_id}")
counter = meter.create_counter(f"test.counter.{thread_id}")
histogram = meter.create_histogram(f"test.histogram.{thread_id}")
for i in range(10):
counter.add(1, {"thread": str(thread_id)})
histogram.record(float(i * 10), {"thread": str(thread_id)})
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(record_metrics, i) for i in range(10)]
concurrent.futures.wait(futures)
# If we get here without exceptions, thread safety is working
def test_mixed_tracing_and_metrics(self, otel_provider):
"""Test using both tracing and metrics together."""
tracer = otel_provider.get_tracer("test.module")
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("operations.count")
histogram = meter.create_histogram("operation.duration")
# Trace an operation while recording metrics
with tracer.start_as_current_span("test.operation") as span:
counter.add(1)
span.set_attribute("step", "start")
histogram.record(50.0)
span.set_attribute("step", "processing")
counter.add(1)
span.set_attribute("step", "complete")
# No exceptions means success
class TestOTelTelemetryProviderFastAPIMiddleware:
"""Tests for FastAPI middleware functionality."""
def test_fastapi_middleware(self, otel_provider):
"""Test that fastapi_middleware can be called."""
@ -318,51 +395,182 @@ class TestOTelTelemetryProviderTracing:
# Should not raise an exception
otel_provider.fastapi_middleware(mock_app)
def test_fastapi_middleware_is_idempotent(self, otel_provider):
"""Test that calling fastapi_middleware multiple times is safe."""
mock_app = MagicMock()
# Should be able to call multiple times without error
otel_provider.fastapi_middleware(mock_app)
# Note: Second call might warn but shouldn't fail
# otel_provider.fastapi_middleware(mock_app)
class TestOTelTelemetryProviderEdgeCases:
"""Tests for edge cases and error conditions."""
def test_record_count_with_zero(self, otel_provider):
"""Test that record_count works with zero value."""
otel_provider.record_count("zero_counter", 0.0)
def test_tracer_with_empty_module_name(self, otel_provider):
"""Test that get_tracer works with empty module name."""
tracer = otel_provider.get_tracer("")
assert "zero_counter" in otel_provider._counters
assert tracer is not None
assert isinstance(tracer, Tracer)
def test_record_count_with_large_value(self, otel_provider):
"""Test that record_count works with large values."""
otel_provider.record_count("large_counter", 1_000_000.0)
def test_meter_with_empty_name(self, otel_provider):
"""Test that get_meter works with empty name."""
meter = otel_provider.get_meter("")
assert "large_counter" in otel_provider._counters
assert meter is not None
assert isinstance(meter, Meter)
def test_record_histogram_with_negative_value(self, otel_provider):
"""Test that record_histogram works with negative values."""
otel_provider.record_histogram("negative_histogram", -10.0)
def test_meter_instruments_with_special_characters(self, otel_provider):
"""Test that metric names with dots, underscores, and hyphens work."""
meter = otel_provider.get_meter("test.meter")
assert "negative_histogram" in otel_provider._histograms
counter = meter.create_counter("test.counter_name-special")
histogram = meter.create_histogram("test.histogram_name-special")
def test_record_up_down_counter_with_negative_value(self, otel_provider):
"""Test that record_up_down_counter works with negative values."""
otel_provider.record_up_down_counter("negative_updown", -5.0)
assert counter is not None
assert histogram is not None
assert "negative_updown" in otel_provider._up_down_counters
# Verify they can be used
counter.add(1)
histogram.record(10.0)
def test_metric_names_with_special_characters(self, otel_provider):
"""Test that metric names with dots and underscores work."""
otel_provider.record_count("test.counter_name-special", 1.0)
otel_provider.record_histogram("test.histogram_name-special", 10.0)
def test_meter_counter_with_zero_value(self, otel_provider):
"""Test that counters work with zero value."""
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
assert "test.counter_name-special" in otel_provider._counters
assert "test.histogram_name-special" in otel_provider._histograms
# Should not raise an exception
counter.add(0.0)
def test_empty_attributes_dict(self, otel_provider):
def test_meter_histogram_with_negative_value(self, otel_provider):
"""Test that histograms accept negative values."""
meter = otel_provider.get_meter("test.meter")
histogram = meter.create_histogram("test.histogram")
# Should not raise an exception
histogram.record(-10.0)
def test_meter_up_down_counter_with_negative_value(self, otel_provider):
"""Test that up/down counters work with negative values."""
meter = otel_provider.get_meter("test.meter")
up_down_counter = meter.create_up_down_counter("test.updown")
# Should not raise an exception
up_down_counter.add(-5.0)
def test_meter_instruments_with_empty_attributes(self, otel_provider):
"""Test that empty attributes dict is handled correctly."""
otel_provider.record_count("test_counter", 1.0, attributes={})
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
assert "test_counter" in otel_provider._counters
# Should not raise an exception
counter.add(1.0, attributes={})
def test_none_attributes(self, otel_provider):
def test_meter_instruments_with_none_attributes(self, otel_provider):
"""Test that None attributes are handled correctly."""
otel_provider.record_count("test_counter", 1.0, attributes=None)
meter = otel_provider.get_meter("test.meter")
counter = meter.create_counter("test.counter")
assert "test_counter" in otel_provider._counters
# Should not raise an exception
counter.add(1.0, attributes=None)
class TestOTelTelemetryProviderRealisticScenarios:
"""Tests simulating realistic usage scenarios."""
def test_inference_request_telemetry(self, otel_provider):
"""Simulate telemetry for a complete inference request."""
tracer = otel_provider.get_tracer("llama_stack.inference")
meter = otel_provider.get_meter("llama_stack.metrics")
# Create instruments
request_counter = meter.create_counter("llama.requests.total")
token_counter = meter.create_counter("llama.tokens.total")
latency_histogram = meter.create_histogram("llama.request.duration_ms")
in_flight_gauge = meter.create_up_down_counter("llama.requests.in_flight")
# Simulate request
with tracer.start_as_current_span("inference.request") as request_span:
request_span.set_attribute("model.id", "llama-3.2-1b")
request_span.set_attribute("user.id", "test-user")
request_counter.add(1, {"model": "llama-3.2-1b"})
in_flight_gauge.add(1)
# Simulate token counting
token_counter.add(25, {"type": "input", "model": "llama-3.2-1b"})
token_counter.add(150, {"type": "output", "model": "llama-3.2-1b"})
# Simulate latency
latency_histogram.record(125.5, {"model": "llama-3.2-1b"})
in_flight_gauge.add(-1)
request_span.set_attribute("tokens.input", 25)
request_span.set_attribute("tokens.output", 150)
def test_multi_step_workflow_with_nested_spans(self, otel_provider):
"""Simulate a multi-step workflow with nested spans."""
tracer = otel_provider.get_tracer("llama_stack.workflow")
meter = otel_provider.get_meter("llama_stack.workflow.metrics")
step_counter = meter.create_counter("workflow.steps.completed")
with tracer.start_as_current_span("workflow.execute") as root_span:
root_span.set_attribute("workflow.id", "wf-123")
# Step 1: Validate
with tracer.start_as_current_span("step.validate") as span:
span.set_attribute("validation.result", "pass")
step_counter.add(1, {"step": "validate", "status": "success"})
# Step 2: Process
with tracer.start_as_current_span("step.process") as span:
span.set_attribute("items.processed", 100)
step_counter.add(1, {"step": "process", "status": "success"})
# Step 3: Finalize
with tracer.start_as_current_span("step.finalize") as span:
span.set_attribute("output.size", 1024)
step_counter.add(1, {"step": "finalize", "status": "success"})
root_span.set_attribute("workflow.status", "completed")
def test_error_handling_with_telemetry(self, otel_provider):
"""Test telemetry when errors occur."""
tracer = otel_provider.get_tracer("llama_stack.errors")
meter = otel_provider.get_meter("llama_stack.errors.metrics")
error_counter = meter.create_counter("llama.errors.total")
with tracer.start_as_current_span("operation.with.error") as span:
try:
span.set_attribute("step", "processing")
# Simulate an error
raise ValueError("Test error")
except ValueError as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
error_counter.add(1, {"error.type": "ValueError"})
# Should not raise - error was handled
def test_batch_operations_telemetry(self, otel_provider):
"""Test telemetry for batch operations."""
tracer = otel_provider.get_tracer("llama_stack.batch")
meter = otel_provider.get_meter("llama_stack.batch.metrics")
batch_counter = meter.create_counter("llama.batch.items.processed")
batch_duration = meter.create_histogram("llama.batch.duration_ms")
with tracer.start_as_current_span("batch.process") as batch_span:
batch_span.set_attribute("batch.size", 100)
for i in range(100):
with tracer.start_as_current_span(f"item.{i}") as item_span:
item_span.set_attribute("item.index", i)
batch_counter.add(1, {"status": "success"})
batch_duration.record(5000.0, {"batch.size": "100"})
batch_span.set_attribute("batch.status", "completed")

50
uv.lock generated
View file

@ -1775,6 +1775,7 @@ dependencies = [
{ name = "openai" },
{ name = "opentelemetry-exporter-otlp-proto-http" },
{ name = "opentelemetry-instrumentation-fastapi" },
{ name = "opentelemetry-instrumentation-sqlalchemy" },
{ name = "opentelemetry-sdk" },
{ name = "opentelemetry-semantic-conventions" },
{ name = "pillow" },
@ -1902,6 +1903,7 @@ requires-dist = [
{ name = "openai", specifier = ">=1.107" },
{ name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.30.0" },
{ name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.57b0" },
{ name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.57b0" },
{ name = "opentelemetry-sdk", specifier = ">=1.30.0" },
{ name = "opentelemetry-semantic-conventions", specifier = ">=0.57b0" },
{ name = "pandas", marker = "extra == 'ui'" },
@ -2756,6 +2758,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/df/f20fc21c88c7af5311bfefc15fc4e606bab5edb7c193aa8c73c354904c35/opentelemetry_instrumentation_fastapi-0.57b0-py3-none-any.whl", hash = "sha256:61e6402749ffe0bfec582e58155e0d81dd38723cd9bc4562bca1acca80334006", size = 12712, upload-time = "2025-07-29T15:42:03.332Z" },
]
[[package]]
name = "opentelemetry-instrumentation-sqlalchemy"
version = "0.57b0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "opentelemetry-api" },
{ name = "opentelemetry-instrumentation" },
{ name = "opentelemetry-semantic-conventions" },
{ name = "packaging" },
{ name = "wrapt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9c/18/ee1460dcb044b25aaedd6cfd063304d84ae641dddb8fb9287959f7644100/opentelemetry_instrumentation_sqlalchemy-0.57b0.tar.gz", hash = "sha256:95667326b7cc22bb4bc9941f98ca22dd177679f9a4d277646cc21074c0d732ff", size = 14962, upload-time = "2025-07-29T15:43:12.426Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/18/af35650eb029d771b8d281bea770727f1e2f662c422c5ab1a0c2b7afc152/opentelemetry_instrumentation_sqlalchemy-0.57b0-py3-none-any.whl", hash = "sha256:8a1a815331cb04fc95aa7c50e9c681cdccfb12e1fa4522f079fe4b24753ae106", size = 14202, upload-time = "2025-07-29T15:42:25.828Z" },
]
[[package]]
name = "opentelemetry-proto"
version = "1.36.0"
@ -4821,9 +4839,9 @@ dependencies = [
{ name = "typing-extensions", marker = "sys_platform == 'darwin'" },
]
wheels = [
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp312-none-macosx_11_0_arm64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-cp313t-macosx_14_0_arm64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-none-macosx_11_0_arm64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:a47b7986bee3f61ad217d8a8ce24605809ab425baf349f97de758815edd2ef54" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:fbe2e149c5174ef90d29a5f84a554dfaf28e003cb4f61fa2c8c024c17ec7ca58" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:057efd30a6778d2ee5e2374cd63a63f63311aa6f33321e627c655df60abdd390" },
]
[[package]]
@ -4846,19 +4864,19 @@ dependencies = [
{ name = "typing-extensions", marker = "sys_platform != 'darwin'" },
]
wheels = [
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-linux_s390x.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_amd64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_arm64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-linux_s390x.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_amd64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_arm64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-win_amd64.whl" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-linux_s390x.whl", hash = "sha256:0e34e276722ab7dd0dffa9e12fe2135a9b34a0e300c456ed7ad6430229404eb5" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:610f600c102386e581327d5efc18c0d6edecb9820b4140d26163354a99cd800d" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:cb9a8ba8137ab24e36bf1742cb79a1294bd374db570f09fc15a5e1318160db4e" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:2be20b2c05a0cce10430cc25f32b689259640d273232b2de357c35729132256d" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:99fc421a5d234580e45957a7b02effbf3e1c884a5dd077afc85352c77bf41434" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-linux_s390x.whl", hash = "sha256:8b5882276633cf91fe3d2d7246c743b94d44a7e660b27f1308007fdb1bb89f7d" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:a5064b5e23772c8d164068cc7c12e01a75faf7b948ecd95a0d4007d7487e5f25" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:8f81dedb4c6076ec325acc3b47525f9c550e5284a18eae1d9061c543f7b6e7de" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:e1ee1b2346ade3ea90306dfbec7e8ff17bc220d344109d189ae09078333b0856" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:64c187345509f2b1bb334feed4666e2c781ca381874bde589182f81247e61f88" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:af81283ac671f434b1b25c95ba295f270e72db1fad48831eb5e4748ff9840041" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:a9dbb6f64f63258bc811e2c0c99640a81e5af93c531ad96e95c5ec777ea46dab" },
{ url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:6d93a7165419bc4b2b907e859ccab0dea5deeab261448ae9a5ec5431f14c0e64" },
]
[[package]]