diff --git a/llama_stack/core/server/server.py b/llama_stack/core/server/server.py index 6f39eb2d7..ab051cb2e 100644 --- a/llama_stack/core/server/server.py +++ b/llama_stack/core/server/server.py @@ -424,6 +424,7 @@ def create_app( if Api.telemetry in impls: impls[Api.telemetry].fastapi_middleware(app) + impls[Api.telemetry].sqlalchemy_instrumentation() # Load external APIs if configured external_apis = load_external_apis(config) diff --git a/llama_stack/core/telemetry/__initi__.py b/llama_stack/core/telemetry/__initi__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/llama_stack/core/telemetry/telemetry.py b/llama_stack/core/telemetry/telemetry.py index fafe7cce5..ef2ddd6a2 100644 --- a/llama_stack/core/telemetry/telemetry.py +++ b/llama_stack/core/telemetry/telemetry.py @@ -6,7 +6,13 @@ from abc import abstractmethod from fastapi import FastAPI from pydantic import BaseModel -from typing import Any + +from opentelemetry.trace import Tracer +from opentelemetry.metrics import Meter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Attributes +from sqlalchemy import Engine class TelemetryProvider(BaseModel): @@ -19,31 +25,34 @@ class TelemetryProvider(BaseModel): Injects FastAPI middleware that instruments the application for telemetry. """ ... - + @abstractmethod - def custom_trace(self, name: str, *args, **kwargs) -> Any: + def sqlalchemy_instrumentation(self, engine: Engine | None = None): """ - Creates a custom trace. + Injects SQLAlchemy instrumentation that instruments the application for telemetry. """ ... @abstractmethod - def record_count(self, name: str, *args, **kwargs): + def get_tracer(self, + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + tracer_provider: TracerProvider | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None + ) -> Tracer: """ - Increments a counter metric. + Gets a tracer. """ ... @abstractmethod - def record_histogram(self, name: str, *args, **kwargs): + def get_meter(self, name: str, + version: str = "", + meter_provider: MeterProvider | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None) -> Meter: """ - Records a histogram metric. - """ - ... - - @abstractmethod - def record_up_down_counter(self, name: str, *args, **kwargs): - """ - Records an up/down counter metric. + Gets a meter. """ ... diff --git a/llama_stack/core/telemetry/tracing.py b/llama_stack/core/telemetry/tracing.py deleted file mode 100644 index c19900a89..000000000 --- a/llama_stack/core/telemetry/tracing.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. -from abc import abstractmethod -from fastapi import FastAPI -from pydantic import BaseModel - - -class TelemetryProvider(BaseModel): - """ - TelemetryProvider standardizes how telemetry is provided to the application. - """ - @abstractmethod - def fastapi_middleware(self, app: FastAPI, *args, **kwargs): - """ - Injects FastAPI middleware that instruments the application for telemetry. - """ - ... diff --git a/llama_stack/providers/inline/telemetry/otel/__init__.py b/llama_stack/providers/inline/telemetry/otel/__init__.py index e69de29bb..f432d3364 100644 --- a/llama_stack/providers/inline/telemetry/otel/__init__.py +++ b/llama_stack/providers/inline/telemetry/otel/__init__.py @@ -0,0 +1,24 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .config import OTelTelemetryConfig + +__all__ = ["OTelTelemetryConfig"] + + +async def get_provider_impl(config: OTelTelemetryConfig, deps): + """ + Get the OTel telemetry provider implementation. + + This function is called by the Llama Stack registry to instantiate + the provider. + """ + from .otel import OTelTelemetryProvider + + # The provider is synchronously initialized via Pydantic model_post_init + # No async initialization needed + return OTelTelemetryProvider(config=config) + diff --git a/llama_stack/providers/inline/telemetry/otel/config.py b/llama_stack/providers/inline/telemetry/otel/config.py index e1ff2f1b0..ad4982716 100644 --- a/llama_stack/providers/inline/telemetry/otel/config.py +++ b/llama_stack/providers/inline/telemetry/otel/config.py @@ -1,4 +1,4 @@ -from typing import Literal +from typing import Any, Literal from pydantic import BaseModel, Field @@ -11,17 +11,19 @@ class OTelTelemetryConfig(BaseModel): """ The configuration for the OpenTelemetry telemetry provider. Most configuration is set using environment variables. - See https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ for more information. + See https://opentelemetry.io/docs/specs/otel/configuration/sdk-configuration-variables/ for more information. """ service_name: str = Field( description="""The name of the service to be monitored. Is overridden by the OTEL_SERVICE_NAME or OTEL_RESOURCE_ATTRIBUTES environment variables.""", ) service_version: str | None = Field( + default=None, description="""The version of the service to be monitored. Is overriden by the OTEL_RESOURCE_ATTRIBUTES environment variable.""" ) deployment_environment: str | None = Field( + default=None, description="""The name of the environment of the service to be monitored. Is overriden by the OTEL_RESOURCE_ATTRIBUTES environment variable.""" ) @@ -30,3 +32,13 @@ class OTelTelemetryConfig(BaseModel): Is overriden by the OTEL_SPAN_PROCESSOR environment variable.""", default="batch" ) + + @classmethod + def sample_run_config(cls, __distro_dir__: str = "") -> dict[str, Any]: + """Sample configuration for use in distributions.""" + return { + "service_name": "${env.OTEL_SERVICE_NAME:=llama-stack}", + "service_version": "${env.OTEL_SERVICE_VERSION:=}", + "deployment_environment": "${env.OTEL_DEPLOYMENT_ENVIRONMENT:=}", + "span_processor": "${env.OTEL_SPAN_PROCESSOR:=batch}", + } diff --git a/llama_stack/providers/inline/telemetry/otel/otel.py b/llama_stack/providers/inline/telemetry/otel/otel.py index 1d2e2e4ab..08a2c9a63 100644 --- a/llama_stack/providers/inline/telemetry/otel/otel.py +++ b/llama_stack/providers/inline/telemetry/otel/otel.py @@ -1,22 +1,21 @@ import os -import threading from opentelemetry import trace, metrics -from opentelemetry.context.context import Context from opentelemetry.sdk.resources import Attributes, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.metrics import Counter, UpDownCounter, Histogram, ObservableGauge from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.trace import Span, SpanKind, _Links -from typing import Sequence -from pydantic import PrivateAttr +from opentelemetry.trace import Tracer +from opentelemetry.metrics import Meter +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor -from llama_stack.core.telemetry.tracing import TelemetryProvider +from llama_stack.core.telemetry.telemetry import TelemetryProvider from llama_stack.log import get_logger +from sqlalchemy import Engine + from .config import OTelTelemetryConfig from fastapi import FastAPI @@ -29,15 +28,9 @@ class OTelTelemetryProvider(TelemetryProvider): A simple Open Telemetry native telemetry provider. """ config: OTelTelemetryConfig - _counters: dict[str, Counter] = PrivateAttr(default_factory=dict) - _up_down_counters: dict[str, UpDownCounter] = PrivateAttr(default_factory=dict) - _histograms: dict[str, Histogram] = PrivateAttr(default_factory=dict) - _gauges: dict[str, ObservableGauge] = PrivateAttr(default_factory=dict) - def model_post_init(self, __context): """Initialize provider after Pydantic validation.""" - self._lock = threading.Lock() attributes: Attributes = { key: value @@ -74,68 +67,114 @@ class OTelTelemetryProvider(TelemetryProvider): if not os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"): logger.warning("OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_METRICS_ENDPOINT is not set. Metrics will not be exported.") + def fastapi_middleware(self, app: FastAPI): + """ + Instrument FastAPI with OTel for automatic tracing and metrics. + + Captures: + - Distributed traces for all HTTP requests (via FastAPIInstrumentor) + - HTTP metrics following semantic conventions (custom middleware) + """ + # Enable automatic tracing FastAPIInstrumentor.instrument_app(app) + + # Add custom middleware for HTTP metrics + meter = self.get_meter("llama_stack.http.server") + + # Create HTTP metrics following semantic conventions + # https://opentelemetry.io/docs/specs/semconv/http/http-metrics/ + request_duration = meter.create_histogram( + "http.server.request.duration", + unit="ms", + description="Duration of HTTP server requests" + ) + + active_requests = meter.create_up_down_counter( + "http.server.active_requests", + unit="requests", + description="Number of active HTTP server requests" + ) + + request_count = meter.create_counter( + "http.server.request.count", + unit="requests", + description="Total number of HTTP server requests" + ) + + # Add middleware to record metrics + @app.middleware("http") # type: ignore[misc] + async def http_metrics_middleware(request, call_next): + import time + + # Record active request + active_requests.add(1, { + "http.method": request.method, + "http.route": request.url.path, + }) + + start_time = time.time() + status_code = 500 # Default to error + + try: + response = await call_next(request) + status_code = response.status_code + except Exception: + raise + finally: + # Record metrics + duration_ms = (time.time() - start_time) * 1000 + + attributes = { + "http.method": request.method, + "http.route": request.url.path, + "http.status_code": status_code, + } + + request_duration.record(duration_ms, attributes) + request_count.add(1, attributes) + active_requests.add(-1, { + "http.method": request.method, + "http.route": request.url.path, + }) + + return response - def custom_trace(self, + + def sqlalchemy_instrumentation(self, engine: Engine | None = None): + kwargs = {} + if engine: + kwargs["engine"] = engine + SQLAlchemyInstrumentor().instrument(**kwargs) + + + def get_tracer(self, + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + tracer_provider: TracerProvider | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None + ) -> Tracer: + return trace.get_tracer( + instrumenting_module_name=instrumenting_module_name, + instrumenting_library_version=instrumenting_library_version, + tracer_provider=tracer_provider, + schema_url=schema_url, + attributes=attributes + ) + + + def get_meter(self, name: str, - context: Context | None = None, - kind: SpanKind = SpanKind.INTERNAL, - attributes: Attributes = {}, - links: _Links = None, - start_time: int | None = None, - record_exception: bool = True, - set_status_on_exception: bool = True) -> Span: - """ - Creates a custom tracing span using the Open Telemetry SDK. - """ - tracer = trace.get_tracer(__name__) - return tracer.start_span(name, context, kind, attributes, links, start_time, record_exception, set_status_on_exception) - - - def record_count(self, name: str, amount: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""): - """ - Increments a counter metric using the Open Telemetry SDK that are indexed by the meter name. - This function is designed to be compatible with other popular telemetry providers design patterns, - like Datadog and New Relic. - """ - meter = metrics.get_meter(__name__) - - with self._lock: - if name not in self._counters: - self._counters[name] = meter.create_counter(name, unit=unit, description=description) - counter = self._counters[name] - - counter.add(amount, attributes=attributes, context=context) - - - def record_histogram(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = "", explicit_bucket_boundaries_advisory: Sequence[float] | None = None): - """ - Records a histogram metric using the Open Telemetry SDK that are indexed by the meter name. - This function is designed to be compatible with other popular telemetry providers design patterns, - like Datadog and New Relic. - """ - meter = metrics.get_meter(__name__) - - with self._lock: - if name not in self._histograms: - self._histograms[name] = meter.create_histogram(name, unit=unit, description=description, explicit_bucket_boundaries_advisory=explicit_bucket_boundaries_advisory) - histogram = self._histograms[name] - - histogram.record(value, attributes=attributes, context=context) - - - def record_up_down_counter(self, name: str, value: int|float, context: Context | None = None, attributes: dict[str, str] | None = None, unit: str = "", description: str = ""): - """ - Records an up/down counter metric using the Open Telemetry SDK that are indexed by the meter name. - This function is designed to be compatible with other popular telemetry providers design patterns, - like Datadog and New Relic. - """ - meter = metrics.get_meter(__name__) - - with self._lock: - if name not in self._up_down_counters: - self._up_down_counters[name] = meter.create_up_down_counter(name, unit=unit, description=description) - up_down_counter = self._up_down_counters[name] - - up_down_counter.add(value, attributes=attributes, context=context) + version: str = "", + meter_provider: MeterProvider | None = None, + schema_url: str | None = None, + attributes: Attributes | None = None + ) -> Meter: + return metrics.get_meter( + name=name, + version=version, + meter_provider=meter_provider, + schema_url=schema_url, + attributes=attributes + ) \ No newline at end of file diff --git a/llama_stack/providers/registry/telemetry.py b/llama_stack/providers/registry/telemetry.py index b50b422c1..50f73ce5f 100644 --- a/llama_stack/providers/registry/telemetry.py +++ b/llama_stack/providers/registry/telemetry.py @@ -26,4 +26,16 @@ def available_providers() -> list[ProviderSpec]: config_class="llama_stack.providers.inline.telemetry.meta_reference.config.TelemetryConfig", description="Meta's reference implementation of telemetry and observability using OpenTelemetry.", ), + InlineProviderSpec( + api=Api.telemetry, + provider_type="inline::otel", + pip_packages=[ + "opentelemetry-sdk", + "opentelemetry-exporter-otlp-proto-http", + "opentelemetry-instrumentation-fastapi", + ], + module="llama_stack.providers.inline.telemetry.otel", + config_class="llama_stack.providers.inline.telemetry.otel.config.OTelTelemetryConfig", + description="Native OpenTelemetry provider with full access to OTel Tracer and Meter APIs for advanced instrumentation.", + ), ] diff --git a/pyproject.toml b/pyproject.toml index e3a629560..b842092cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "sqlalchemy[asyncio]>=2.0.41", # server - for conversations "opentelemetry-semantic-conventions>=0.57b0", "opentelemetry-instrumentation-fastapi>=0.57b0", + "opentelemetry-instrumentation-sqlalchemy>=0.57b0", ] [project.optional-dependencies] diff --git a/tests/integration/telemetry/__init__.py b/tests/integration/telemetry/__init__.py new file mode 100644 index 000000000..d4a3e15c8 --- /dev/null +++ b/tests/integration/telemetry/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + diff --git a/tests/integration/telemetry/mocking/README.md b/tests/integration/telemetry/mocking/README.md new file mode 100644 index 000000000..853022622 --- /dev/null +++ b/tests/integration/telemetry/mocking/README.md @@ -0,0 +1,148 @@ +# Mock Server Infrastructure + +This directory contains mock servers for E2E telemetry testing. + +## Structure + +``` +mocking/ +├── README.md ← You are here +├── __init__.py ← Module exports +├── mock_base.py ← Pydantic base class for all mocks +├── servers.py ← Mock server implementations +└── harness.py ← Async startup harness +``` + +## Files + +### `mock_base.py` - Base Class +Pydantic base model that all mock servers must inherit from. + +**Contract:** +```python +class MockServerBase(BaseModel): + async def await_start(self): + # Start server and wait until ready + ... + + def stop(self): + # Stop server and cleanup + ... +``` + +### `servers.py` - Mock Implementations +Contains: +- **MockOTLPCollector** - Receives OTLP telemetry (port 4318) +- **MockVLLMServer** - Simulates vLLM inference API (port 8000) + +### `harness.py` - Startup Orchestration +Provides: +- **MockServerConfig** - Pydantic config for server registration +- **start_mock_servers_async()** - Starts servers in parallel +- **stop_mock_servers()** - Stops all servers + +## Creating a New Mock Server + +### Step 1: Implement the Server + +Add to `servers.py`: +```python +class MockRedisServer(MockServerBase): + """Mock Redis server.""" + + port: int = Field(default=6379) + + # Non-Pydantic fields + server: Any = Field(default=None, exclude=True) + + def model_post_init(self, __context): + self.server = None + + async def await_start(self): + """Start Redis mock and wait until ready.""" + # Start your server + self.server = create_redis_server(self.port) + self.server.start() + + # Wait for port to be listening + for _ in range(10): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if sock.connect_ex(('localhost', self.port)) == 0: + sock.close() + return # Ready! + await asyncio.sleep(0.1) + + def stop(self): + if self.server: + self.server.stop() +``` + +### Step 2: Register in Test + +In `test_otel_e2e.py`, add to MOCK_SERVERS list: +```python +MOCK_SERVERS = [ + # ... existing servers ... + MockServerConfig( + name="Mock Redis", + server_class=MockRedisServer, + init_kwargs={"port": 6379}, + ), +] +``` + +### Step 3: Done! + +The harness automatically: +- Creates the server instance +- Calls `await_start()` in parallel with other servers +- Returns when all are ready +- Stops all servers on teardown + +## Benefits + +✅ **Parallel Startup** - All servers start simultaneously +✅ **Type-Safe** - Pydantic validation +✅ **Simple** - Just implement 2 methods +✅ **Fast** - No HTTP polling, direct port checking +✅ **Clean** - Async/await pattern + +## Usage in Tests + +```python +@pytest.fixture(scope="module") +def mock_servers(): + servers = asyncio.run(start_mock_servers_async(MOCK_SERVERS)) + yield servers + stop_mock_servers(servers) + +# Access specific servers +@pytest.fixture(scope="module") +def mock_redis(mock_servers): + return mock_servers["Mock Redis"] +``` + +## Key Design Decisions + +### Why Pydantic? +- Type safety for server configuration +- Built-in validation +- Clear interface contract + +### Why `await_start()` instead of HTTP `/ready`? +- Faster (no HTTP round-trip) +- Simpler (direct port checking) +- More reliable (internal state, not external endpoint) + +### Why separate harness? +- Reusable across different test files +- Easy to add new servers +- Centralized error handling + +## Examples + +See `test_otel_e2e.py` for real-world usage: +- Line ~200: MOCK_SERVERS configuration +- Line ~230: Convenience fixtures +- Line ~240: Using servers in tests + diff --git a/tests/integration/telemetry/mocking/__init__.py b/tests/integration/telemetry/mocking/__init__.py new file mode 100644 index 000000000..99ad92856 --- /dev/null +++ b/tests/integration/telemetry/mocking/__init__.py @@ -0,0 +1,29 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Mock server infrastructure for telemetry E2E testing. + +This module provides: +- MockServerBase: Pydantic base class for all mock servers +- MockOTLPCollector: Mock OTLP telemetry collector +- MockVLLMServer: Mock vLLM inference server +- Mock server harness for parallel async startup +""" + +from .mock_base import MockServerBase +from .servers import MockOTLPCollector, MockVLLMServer +from .harness import MockServerConfig, start_mock_servers_async, stop_mock_servers + +__all__ = [ + "MockServerBase", + "MockOTLPCollector", + "MockVLLMServer", + "MockServerConfig", + "start_mock_servers_async", + "stop_mock_servers", +] + diff --git a/tests/integration/telemetry/mocking/harness.py b/tests/integration/telemetry/mocking/harness.py new file mode 100644 index 000000000..09b80f70f --- /dev/null +++ b/tests/integration/telemetry/mocking/harness.py @@ -0,0 +1,107 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Mock server startup harness for parallel initialization. + +HOW TO ADD A NEW MOCK SERVER: +1. Import your mock server class +2. Add it to MOCK_SERVERS list with configuration +3. Done! It will start in parallel with others. +""" + +import asyncio +from typing import Any, Dict, List + +from pydantic import BaseModel, Field + +from .mock_base import MockServerBase + + +class MockServerConfig(BaseModel): + """ + Configuration for a mock server to start. + + **TO ADD A NEW MOCK SERVER:** + Just create a MockServerConfig instance with your server class. + + Example: + MockServerConfig( + name="Mock MyService", + server_class=MockMyService, + init_kwargs={"port": 9000, "config_param": "value"}, + ) + """ + + model_config = {"arbitrary_types_allowed": True} + + name: str = Field(description="Display name for logging") + server_class: type = Field(description="Mock server class (must inherit from MockServerBase)") + init_kwargs: Dict[str, Any] = Field(default_factory=dict, description="Kwargs to pass to server constructor") + + +async def start_mock_servers_async(mock_servers_config: List[MockServerConfig]) -> Dict[str, MockServerBase]: + """ + Start all mock servers in parallel and wait for them to be ready. + + **HOW IT WORKS:** + 1. Creates all server instances + 2. Calls await_start() on all servers in parallel + 3. Returns when all are ready + + **SIMPLE TO USE:** + servers = await start_mock_servers_async([config1, config2, ...]) + + Args: + mock_servers_config: List of mock server configurations + + Returns: + Dict mapping server name to server instance + """ + servers = {} + start_tasks = [] + + # Create all servers and prepare start tasks + for config in mock_servers_config: + server = config.server_class(**config.init_kwargs) + servers[config.name] = server + start_tasks.append(server.await_start()) + + # Start all servers in parallel + try: + await asyncio.gather(*start_tasks) + + # Print readiness confirmation + for name in servers.keys(): + print(f"[INFO] {name} ready") + + except Exception as e: + # If any server fails, stop all servers + for server in servers.values(): + try: + server.stop() + except: + pass + raise RuntimeError(f"Failed to start mock servers: {e}") + + return servers + + +def stop_mock_servers(servers: Dict[str, Any]): + """ + Stop all mock servers. + + Args: + servers: Dict of server instances from start_mock_servers_async() + """ + for name, server in servers.items(): + try: + if hasattr(server, 'get_request_count'): + print(f"\n[INFO] {name} received {server.get_request_count()} requests") + server.stop() + except Exception as e: + print(f"[WARN] Error stopping {name}: {e}") + diff --git a/tests/integration/telemetry/mocking/mock_base.py b/tests/integration/telemetry/mocking/mock_base.py new file mode 100644 index 000000000..803058457 --- /dev/null +++ b/tests/integration/telemetry/mocking/mock_base.py @@ -0,0 +1,69 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Base class for mock servers with async startup support. + +All mock servers should inherit from MockServerBase and implement await_start(). +""" + +import asyncio +from abc import abstractmethod +from pydantic import BaseModel, Field + + +class MockServerBase(BaseModel): + """ + Pydantic base model for mock servers. + + **TO CREATE A NEW MOCK SERVER:** + 1. Inherit from this class + 2. Implement async def await_start(self) + 3. Implement def stop(self) + 4. Done! + + Example: + class MyMockServer(MockServerBase): + port: int = 8080 + + async def await_start(self): + # Start your server + self.server = create_server() + self.server.start() + # Wait until ready (can check internal state, no HTTP needed) + while not self.server.is_listening(): + await asyncio.sleep(0.1) + + def stop(self): + if self.server: + self.server.stop() + """ + + model_config = {"arbitrary_types_allowed": True} + + @abstractmethod + async def await_start(self): + """ + Start the server and wait until it's ready. + + This method should: + 1. Start the server (synchronous or async) + 2. Wait until the server is fully ready to accept requests + 3. Return when ready + + Subclasses can check internal state directly - no HTTP polling needed! + """ + ... + + @abstractmethod + def stop(self): + """ + Stop the server and clean up resources. + + This method should gracefully shut down the server. + """ + ... + diff --git a/tests/integration/telemetry/mocking/servers.py b/tests/integration/telemetry/mocking/servers.py new file mode 100644 index 000000000..e055f41b6 --- /dev/null +++ b/tests/integration/telemetry/mocking/servers.py @@ -0,0 +1,387 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Mock servers for OpenTelemetry E2E testing. + +This module provides mock servers for testing telemetry: +- MockOTLPCollector: Receives and stores OTLP telemetry exports +- MockVLLMServer: Simulates vLLM inference API with valid OpenAI responses + +These mocks allow E2E testing without external dependencies. +""" + +import asyncio +import http.server +import json +import socket +import threading +import time +from typing import Any, Dict, List + +from pydantic import Field + +from .mock_base import MockServerBase + + +class MockOTLPCollector(MockServerBase): + """ + Mock OTLP collector HTTP server. + + Receives real OTLP exports from Llama Stack and stores them for verification. + Runs on localhost:4318 (standard OTLP HTTP port). + + Usage: + collector = MockOTLPCollector() + await collector.await_start() + # ... run tests ... + print(f"Received {collector.get_trace_count()} traces") + collector.stop() + """ + + port: int = Field(default=4318, description="Port to run collector on") + + # Non-Pydantic fields (set after initialization) + traces: List[Dict] = Field(default_factory=list, exclude=True) + metrics: List[Dict] = Field(default_factory=list, exclude=True) + server: Any = Field(default=None, exclude=True) + server_thread: Any = Field(default=None, exclude=True) + + def model_post_init(self, __context): + """Initialize after Pydantic validation.""" + self.traces = [] + self.metrics = [] + self.server = None + self.server_thread = None + + def _create_handler_class(self): + """Create the HTTP handler class for this collector instance.""" + collector_self = self + + class OTLPHandler(http.server.BaseHTTPRequestHandler): + """HTTP request handler for OTLP requests.""" + + def log_message(self, format, *args): + """Suppress HTTP server logs.""" + pass + + def do_GET(self): + """Handle GET requests.""" + # No readiness endpoint needed - using await_start() instead + self.send_response(404) + self.end_headers() + + def do_POST(self): + """Handle OTLP POST requests.""" + content_length = int(self.headers.get('Content-Length', 0)) + body = self.rfile.read(content_length) if content_length > 0 else b'' + + # Store the export request + if '/v1/traces' in self.path: + collector_self.traces.append({ + 'body': body, + 'timestamp': time.time(), + }) + elif '/v1/metrics' in self.path: + collector_self.metrics.append({ + 'body': body, + 'timestamp': time.time(), + }) + + # Always return success (200 OK) + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(b'{}') + + return OTLPHandler + + async def await_start(self): + """ + Start the OTLP collector and wait until ready. + + This method is async and can be awaited to ensure the server is ready. + """ + # Create handler and start the HTTP server + handler_class = self._create_handler_class() + self.server = http.server.HTTPServer(('localhost', self.port), handler_class) + self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.server_thread.start() + + # Wait for server to be listening on the port + for _ in range(10): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('localhost', self.port)) + sock.close() + if result == 0: + # Port is listening + return + except: + pass + await asyncio.sleep(0.1) + + raise RuntimeError(f"OTLP collector failed to start on port {self.port}") + + def stop(self): + """Stop the OTLP collector server.""" + if self.server: + self.server.shutdown() + self.server.server_close() + + def clear(self): + """Clear all captured telemetry data.""" + self.traces = [] + self.metrics = [] + + def get_trace_count(self) -> int: + """Get number of trace export requests received.""" + return len(self.traces) + + def get_metric_count(self) -> int: + """Get number of metric export requests received.""" + return len(self.metrics) + + def get_all_traces(self) -> List[Dict]: + """Get all captured trace exports.""" + return self.traces + + def get_all_metrics(self) -> List[Dict]: + """Get all captured metric exports.""" + return self.metrics + + +class MockVLLMServer(MockServerBase): + """ + Mock vLLM inference server with OpenAI-compatible API. + + Returns valid OpenAI Python client response objects for: + - Chat completions (/v1/chat/completions) + - Text completions (/v1/completions) + - Model listing (/v1/models) + + Runs on localhost:8000 (standard vLLM port). + + Usage: + server = MockVLLMServer(models=["my-model"]) + await server.await_start() + # ... make inference calls ... + print(f"Handled {server.get_request_count()} requests") + server.stop() + """ + + port: int = Field(default=8000, description="Port to run server on") + models: List[str] = Field( + default_factory=lambda: ["meta-llama/Llama-3.2-1B-Instruct"], + description="List of model IDs to serve" + ) + + # Non-Pydantic fields + requests_received: List[Dict] = Field(default_factory=list, exclude=True) + server: Any = Field(default=None, exclude=True) + server_thread: Any = Field(default=None, exclude=True) + + def model_post_init(self, __context): + """Initialize after Pydantic validation.""" + self.requests_received = [] + self.server = None + self.server_thread = None + + def _create_handler_class(self): + """Create the HTTP handler class for this vLLM instance.""" + server_self = self + + class VLLMHandler(http.server.BaseHTTPRequestHandler): + """HTTP request handler for vLLM API.""" + + def log_message(self, format, *args): + """Suppress HTTP server logs.""" + pass + + def log_request(self, code='-', size='-'): + """Log incoming requests for debugging.""" + print(f"[DEBUG] Mock vLLM received: {self.command} {self.path} -> {code}") + + def do_GET(self): + """Handle GET requests (models list, health check).""" + # Log GET requests too + server_self.requests_received.append({ + 'path': self.path, + 'method': 'GET', + 'timestamp': time.time(), + }) + + if self.path == '/v1/models': + response = self._create_models_list_response() + self._send_json_response(200, response) + + elif self.path == '/health' or self.path == '/v1/health': + self._send_json_response(200, {"status": "healthy"}) + + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + """Handle POST requests (chat/text completions).""" + content_length = int(self.headers.get('Content-Length', 0)) + body = self.rfile.read(content_length) if content_length > 0 else b'{}' + + try: + request_data = json.loads(body) + except: + request_data = {} + + # Log the request + server_self.requests_received.append({ + 'path': self.path, + 'body': request_data, + 'timestamp': time.time(), + }) + + # Route to appropriate handler + if '/chat/completions' in self.path: + response = self._create_chat_completion_response(request_data) + self._send_json_response(200, response) + + elif '/completions' in self.path: + response = self._create_text_completion_response(request_data) + self._send_json_response(200, response) + + else: + self._send_json_response(200, {"status": "ok"}) + + # ---------------------------------------------------------------- + # Response Generators + # **TO MODIFY RESPONSES:** Edit these methods + # ---------------------------------------------------------------- + + def _create_models_list_response(self) -> Dict: + """Create OpenAI models list response with configured models.""" + return { + "object": "list", + "data": [ + { + "id": model_id, + "object": "model", + "created": int(time.time()), + "owned_by": "meta", + } + for model_id in server_self.models + ] + } + + def _create_chat_completion_response(self, request_data: Dict) -> Dict: + """ + Create OpenAI ChatCompletion response. + + Returns a valid response matching openai.types.ChatCompletion + """ + return { + "id": "chatcmpl-test123", + "object": "chat.completion", + "created": int(time.time()), + "model": request_data.get("model", "meta-llama/Llama-3.2-1B-Instruct"), + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": "This is a test response from mock vLLM server.", + "tool_calls": None, + }, + "logprobs": None, + "finish_reason": "stop", + }], + "usage": { + "prompt_tokens": 25, + "completion_tokens": 15, + "total_tokens": 40, + "completion_tokens_details": None, + }, + "system_fingerprint": None, + "service_tier": None, + } + + def _create_text_completion_response(self, request_data: Dict) -> Dict: + """ + Create OpenAI Completion response. + + Returns a valid response matching openai.types.Completion + """ + return { + "id": "cmpl-test123", + "object": "text_completion", + "created": int(time.time()), + "model": request_data.get("model", "meta-llama/Llama-3.2-1B-Instruct"), + "choices": [{ + "text": "This is a test completion.", + "index": 0, + "logprobs": None, + "finish_reason": "stop", + }], + "usage": { + "prompt_tokens": 10, + "completion_tokens": 8, + "total_tokens": 18, + "completion_tokens_details": None, + }, + "system_fingerprint": None, + } + + def _send_json_response(self, status_code: int, data: Dict): + """Helper to send JSON response.""" + self.send_response(status_code) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + return VLLMHandler + + async def await_start(self): + """ + Start the vLLM server and wait until ready. + + This method is async and can be awaited to ensure the server is ready. + """ + # Create handler and start the HTTP server + handler_class = self._create_handler_class() + self.server = http.server.HTTPServer(('localhost', self.port), handler_class) + self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.server_thread.start() + + # Wait for server to be listening on the port + for _ in range(10): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('localhost', self.port)) + sock.close() + if result == 0: + # Port is listening + return + except: + pass + await asyncio.sleep(0.1) + + raise RuntimeError(f"vLLM server failed to start on port {self.port}") + + def stop(self): + """Stop the vLLM server.""" + if self.server: + self.server.shutdown() + self.server.server_close() + + def clear(self): + """Clear request history.""" + self.requests_received = [] + + def get_request_count(self) -> int: + """Get number of requests received.""" + return len(self.requests_received) + + def get_all_requests(self) -> List[Dict]: + """Get all received requests with their bodies.""" + return self.requests_received + diff --git a/tests/integration/telemetry/test_otel_e2e.py b/tests/integration/telemetry/test_otel_e2e.py new file mode 100644 index 000000000..06ad79383 --- /dev/null +++ b/tests/integration/telemetry/test_otel_e2e.py @@ -0,0 +1,455 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +End-to-end integration tests for OpenTelemetry with automatic instrumentation. + +HOW THIS WORKS: +1. Starts a mock OTLP collector (HTTP server) to receive telemetry +2. Starts a mock vLLM server to handle inference requests +3. Starts REAL Llama Stack with: opentelemetry-instrument llama stack run +4. Makes REAL API calls to the stack +5. Verifies telemetry was exported to the mock collector + +WHERE TO MAKE CHANGES: +- Add test cases → See TEST_CASES list below (line ~70) +- Add mock servers → See MOCK_SERVERS list in mock_servers fixture (line ~200) +- Modify mock behavior → See mocking/servers.py +- Change stack config → See llama_stack_server fixture (line ~250) +- Add assertions → See TestOTelE2EWithRealServer class (line ~370) + +RUNNING THE TESTS: +- Quick (mock servers only): pytest test_otel_e2e.py::TestMockServers -v +- Full E2E (slow): pytest test_otel_e2e.py::TestOTelE2EWithRealServer -v -m slow +""" + +# ============================================================================ +# IMPORTS +# ============================================================================ + +import os +import socket +import subprocess +import time +from typing import Any, Dict, List + +import pytest +import requests +import yaml +from pydantic import BaseModel, Field + +# Mock servers are in the mocking/ subdirectory +from .mocking import ( + MockOTLPCollector, + MockVLLMServer, + MockServerConfig, + start_mock_servers_async, + stop_mock_servers, +) + + +# ============================================================================ +# DATA MODELS +# ============================================================================ + +class TelemetryTestCase(BaseModel): + """ + Pydantic model defining expected telemetry for an API call. + + **TO ADD A NEW TEST CASE:** Add to TEST_CASES list below. + """ + + name: str = Field(description="Unique test case identifier") + http_method: str = Field(description="HTTP method (GET, POST, etc.)") + api_path: str = Field(description="API path (e.g., '/v1/models')") + request_body: Dict[str, Any] | None = Field(default=None) + expected_http_status: int = Field(default=200) + expected_trace_exports: int = Field(default=1, description="Minimum number of trace exports expected") + expected_metric_exports: int = Field(default=0, description="Minimum number of metric exports expected") + should_have_error_span: bool = Field(default=False) + + +# ============================================================================ +# TEST CONFIGURATION +# **TO ADD NEW TESTS:** Add TelemetryTestCase instances here +# ============================================================================ + +TEST_CASES = [ + TelemetryTestCase( + name="models_list", + http_method="GET", + api_path="/v1/models", + expected_trace_exports=1, + expected_metric_exports=1, # HTTP metrics from OTel provider middleware + ), + TelemetryTestCase( + name="chat_completion", + http_method="POST", + api_path="/v1/inference/chat_completion", + request_body={ + "model": "meta-llama/Llama-3.2-1B-Instruct", + "messages": [{"role": "user", "content": "Hello!"}], + }, + expected_trace_exports=2, # Stack request + vLLM backend call + expected_metric_exports=1, # HTTP metrics (duration, count, active_requests) + ), +] + + +# ============================================================================ +# TEST INFRASTRUCTURE +# ============================================================================ + +class TelemetryTestRunner: + """ + Executes TelemetryTestCase instances against real Llama Stack. + + **HOW IT WORKS:** + 1. Makes real HTTP request to the stack + 2. Waits for telemetry export + 3. Verifies exports were sent to mock collector + """ + + def __init__(self, base_url: str, collector: MockOTLPCollector): + self.base_url = base_url + self.collector = collector + + def run_test_case(self, test_case: TelemetryTestCase, verbose: bool = False) -> bool: + """Execute a single test case and verify telemetry.""" + initial_traces = self.collector.get_trace_count() + initial_metrics = self.collector.get_metric_count() + + if verbose: + print(f"\n--- {test_case.name} ---") + print(f" {test_case.http_method} {test_case.api_path}") + + # Make real HTTP request to Llama Stack + try: + url = f"{self.base_url}{test_case.api_path}" + + if test_case.http_method == "GET": + response = requests.get(url, timeout=5) + elif test_case.http_method == "POST": + response = requests.post(url, json=test_case.request_body or {}, timeout=5) + else: + response = requests.request(test_case.http_method, url, timeout=5) + + if verbose: + print(f" HTTP Response: {response.status_code}") + + status_match = response.status_code == test_case.expected_http_status + + except requests.exceptions.RequestException as e: + if verbose: + print(f" Request failed: {e}") + status_match = False + + # Wait for automatic instrumentation to export telemetry + # Traces export immediately, metrics export every 1 second (configured via env var) + time.sleep(2.0) # Wait for both traces and metrics to export + + # Verify traces were exported to mock collector + new_traces = self.collector.get_trace_count() - initial_traces + traces_exported = new_traces >= test_case.expected_trace_exports + + # Verify metrics were exported (if expected) + new_metrics = self.collector.get_metric_count() - initial_metrics + metrics_exported = new_metrics >= test_case.expected_metric_exports + + if verbose: + print(f" Expected: >={test_case.expected_trace_exports} trace exports, >={test_case.expected_metric_exports} metric exports") + print(f" Actual: {new_traces} trace exports, {new_metrics} metric exports") + result = status_match and traces_exported and metrics_exported + print(f" Result: {'PASS' if result else 'FAIL'}") + + return status_match and traces_exported and metrics_exported + + def run_all_test_cases(self, test_cases: List[TelemetryTestCase], verbose: bool = True) -> Dict[str, bool]: + """Run all test cases and return results.""" + results = {} + for test_case in test_cases: + results[test_case.name] = self.run_test_case(test_case, verbose=verbose) + return results + + +# ============================================================================ +# HELPER FUNCTIONS +# ============================================================================ + +def is_port_available(port: int) -> bool: + """Check if a TCP port is available for binding.""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(('localhost', port)) + return True + except OSError: + return False + + +# ============================================================================ +# PYTEST FIXTURES +# ============================================================================ + +@pytest.fixture(scope="module") +def mock_servers(): + """ + Fixture: Start all mock servers in parallel using async harness. + + **TO ADD A NEW MOCK SERVER:** + Just add a MockServerConfig to the MOCK_SERVERS list below. + """ + import asyncio + + # ======================================================================== + # MOCK SERVER CONFIGURATION + # **TO ADD A NEW MOCK:** Just add a MockServerConfig instance below + # + # Example: + # MockServerConfig( + # name="Mock MyService", + # server_class=MockMyService, # Must inherit from MockServerBase + # init_kwargs={"port": 9000, "param": "value"}, + # ), + # ======================================================================== + MOCK_SERVERS = [ + MockServerConfig( + name="Mock OTLP Collector", + server_class=MockOTLPCollector, + init_kwargs={"port": 4318}, + ), + MockServerConfig( + name="Mock vLLM Server", + server_class=MockVLLMServer, + init_kwargs={ + "port": 8000, + "models": ["meta-llama/Llama-3.2-1B-Instruct"], + }, + ), + # Add more mock servers here - they will start in parallel automatically! + ] + + # Start all servers in parallel + servers = asyncio.run(start_mock_servers_async(MOCK_SERVERS)) + + # Verify vLLM models + models_response = requests.get("http://localhost:8000/v1/models", timeout=1) + models_data = models_response.json() + print(f"[INFO] Mock vLLM serving {len(models_data['data'])} models: {[m['id'] for m in models_data['data']]}") + + yield servers + + # Stop all servers + stop_mock_servers(servers) + + +@pytest.fixture(scope="module") +def mock_otlp_collector(mock_servers): + """Convenience fixture to get OTLP collector from mock_servers.""" + return mock_servers["Mock OTLP Collector"] + + +@pytest.fixture(scope="module") +def mock_vllm_server(mock_servers): + """Convenience fixture to get vLLM server from mock_servers.""" + return mock_servers["Mock vLLM Server"] + + +@pytest.fixture(scope="module") +def llama_stack_server(tmp_path_factory, mock_otlp_collector, mock_vllm_server): + """ + Fixture: Start real Llama Stack server with automatic OTel instrumentation. + + **THIS IS THE MAIN FIXTURE** - it runs: + opentelemetry-instrument llama stack run --config run.yaml + + **TO MODIFY STACK CONFIG:** Edit run_config dict below + """ + config_dir = tmp_path_factory.mktemp("otel-stack-config") + + # Ensure mock vLLM is ready and accessible before starting Llama Stack + print(f"\n[INFO] Verifying mock vLLM is accessible at http://localhost:8000...") + try: + vllm_models = requests.get("http://localhost:8000/v1/models", timeout=2) + print(f"[INFO] Mock vLLM models endpoint response: {vllm_models.status_code}") + except Exception as e: + pytest.fail(f"Mock vLLM not accessible before starting Llama Stack: {e}") + + # Create run.yaml with inference provider + # **TO ADD MORE PROVIDERS:** Add to providers dict + run_config = { + "image_name": "test-otel-e2e", + "apis": ["inference"], + "providers": { + "inference": [ + { + "provider_id": "vllm", + "provider_type": "remote::vllm", + "config": { + "url": "http://localhost:8000/v1", + }, + }, + ], + }, + "models": [ + { + "model_id": "meta-llama/Llama-3.2-1B-Instruct", + "provider_id": "vllm", + } + ], + } + + config_file = config_dir / "run.yaml" + with open(config_file, "w") as f: + yaml.dump(run_config, f) + + # Find available port for Llama Stack + port = 5555 + while not is_port_available(port) and port < 5600: + port += 1 + + if port >= 5600: + pytest.skip("No available ports for test server") + + # Set environment variables for OTel instrumentation + # NOTE: These only affect the subprocess, not other tests + env = os.environ.copy() + env["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4318" + env["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" # Ensure correct protocol + env["OTEL_SERVICE_NAME"] = "llama-stack-e2e-test" + env["LLAMA_STACK_PORT"] = str(port) + env["OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"] = "true" + + # Configure fast metric export for testing (default is 60 seconds) + # This makes metrics export every 500ms instead of every 60 seconds + env["OTEL_METRIC_EXPORT_INTERVAL"] = "500" # milliseconds + env["OTEL_METRIC_EXPORT_TIMEOUT"] = "1000" # milliseconds + + # Disable inference recording to ensure real requests to our mock vLLM + # This is critical - without this, Llama Stack replays cached responses + # Safe to remove here as it only affects the subprocess environment + if "LLAMA_STACK_TEST_INFERENCE_MODE" in env: + del env["LLAMA_STACK_TEST_INFERENCE_MODE"] + + # Start server with automatic instrumentation + cmd = [ + "opentelemetry-instrument", # ← Automatic instrumentation wrapper + "llama", "stack", "run", + str(config_file), + "--port", str(port), + ] + + print(f"\n[INFO] Starting Llama Stack with OTel instrumentation on port {port}") + print(f"[INFO] Command: {' '.join(cmd)}") + + process = subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + # Wait for server to start + max_wait = 30 + base_url = f"http://localhost:{port}" + + for i in range(max_wait): + try: + response = requests.get(f"{base_url}/v1/health", timeout=1) + if response.status_code == 200: + print(f"[INFO] Server ready at {base_url}") + break + except requests.exceptions.RequestException: + if i == max_wait - 1: + process.terminate() + stdout, stderr = process.communicate(timeout=5) + pytest.fail(f"Server failed to start.\nStdout: {stdout}\nStderr: {stderr}") + time.sleep(1) + + yield { + 'base_url': base_url, + 'port': port, + 'collector': mock_otlp_collector, + 'vllm_server': mock_vllm_server, + } + + # Cleanup + print(f"\n[INFO] Stopping Llama Stack server") + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +# ============================================================================ +# TESTS: End-to-End with Real Stack +# **THESE RUN SLOW** - marked with @pytest.mark.slow +# **TO ADD NEW E2E TESTS:** Add methods to this class +# ============================================================================ + +@pytest.mark.slow +class TestOTelE2E: + """ + End-to-end tests with real Llama Stack server. + + These tests verify the complete flow: + - Real Llama Stack with opentelemetry-instrument + - Real API calls + - Real automatic instrumentation + - Mock OTLP collector captures exports + """ + + def test_server_starts_with_auto_instrumentation(self, llama_stack_server): + """Verify server starts successfully with opentelemetry-instrument.""" + base_url = llama_stack_server['base_url'] + + # Try different health check endpoints + health_endpoints = ["/health", "/v1/health", "/"] + server_responding = False + + for endpoint in health_endpoints: + try: + response = requests.get(f"{base_url}{endpoint}", timeout=5) + print(f"\n[DEBUG] {endpoint} -> {response.status_code}") + if response.status_code == 200: + server_responding = True + break + except Exception as e: + print(f"[DEBUG] {endpoint} failed: {e}") + + assert server_responding, f"Server not responding on any endpoint at {base_url}" + + print(f"\n[PASS] Llama Stack running with OTel at {base_url}") + + def test_all_test_cases_via_runner(self, llama_stack_server): + """ + **MAIN TEST:** Run all TelemetryTestCase instances. + + This executes all test cases defined in TEST_CASES list. + **TO ADD MORE TESTS:** Add to TEST_CASES at top of file + """ + base_url = llama_stack_server['base_url'] + collector = llama_stack_server['collector'] + + # Create test runner + runner = TelemetryTestRunner(base_url, collector) + + # Execute all test cases + results = runner.run_all_test_cases(TEST_CASES, verbose=True) + + # Print summary + print(f"\n{'='*50}") + print(f"TEST CASE SUMMARY") + print(f"{'='*50}") + passed = sum(1 for p in results.values() if p) + total = len(results) + print(f"Passed: {passed}/{total}\n") + + for name, result in results.items(): + status = "[PASS]" if result else "[FAIL]" + print(f" {status} {name}") + print(f"{'='*50}\n") diff --git a/tests/integration/telemetry/test_otel_provider.py b/tests/integration/telemetry/test_otel_provider.py deleted file mode 100644 index 249dd6fb3..000000000 --- a/tests/integration/telemetry/test_otel_provider.py +++ /dev/null @@ -1,532 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -""" -Integration tests for OpenTelemetry provider. - -These tests verify that the OTel provider correctly: -- Initializes within the Llama Stack -- Captures expected metrics (counters, histograms, up/down counters) -- Captures expected spans/traces -- Exports telemetry data to an OTLP collector (in-memory for testing) - -Tests use in-memory exporters to avoid external dependencies and can run in GitHub Actions. -""" - -import os -import time -from collections import defaultdict -from unittest.mock import patch - -import pytest -from opentelemetry.sdk.metrics.export import InMemoryMetricReader -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - -from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig -from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider - - -@pytest.fixture(scope="module") -def in_memory_span_exporter(): - """Create an in-memory span exporter to capture traces.""" - return InMemorySpanExporter() - - -@pytest.fixture(scope="module") -def in_memory_metric_reader(): - """Create an in-memory metric reader to capture metrics.""" - return InMemoryMetricReader() - - -@pytest.fixture(scope="module") -def otel_provider_with_memory_exporters(in_memory_span_exporter, in_memory_metric_reader): - """ - Create an OTelTelemetryProvider configured with in-memory exporters. - - This allows us to capture and verify telemetry data without external services. - Returns a dict with 'provider', 'span_exporter', and 'metric_reader'. - """ - # Set mock environment to avoid warnings - os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4318" - - config = OTelTelemetryConfig( - service_name="test-llama-stack-otel", - service_version="1.0.0-test", - deployment_environment="ci-test", - span_processor="simple", - ) - - # Patch the provider to use in-memory exporters - with patch.object( - OTelTelemetryProvider, - 'model_post_init', - lambda self, _: _init_with_memory_exporters( - self, config, in_memory_span_exporter, in_memory_metric_reader - ) - ): - provider = OTelTelemetryProvider(config=config) - yield { - 'provider': provider, - 'span_exporter': in_memory_span_exporter, - 'metric_reader': in_memory_metric_reader - } - - -def _init_with_memory_exporters(provider, config, span_exporter, metric_reader): - """Helper to initialize provider with in-memory exporters.""" - import threading - from opentelemetry import metrics, trace - from opentelemetry.sdk.metrics import MeterProvider - from opentelemetry.sdk.resources import Attributes, Resource - from opentelemetry.sdk.trace import TracerProvider - - # Initialize pydantic private attributes - if provider.__pydantic_private__ is None: - provider.__pydantic_private__ = {} - - provider._lock = threading.Lock() - provider._counters = {} - provider._up_down_counters = {} - provider._histograms = {} - provider._gauges = {} - - # Create resource attributes - attributes: Attributes = { - key: value - for key, value in { - "service.name": config.service_name, - "service.version": config.service_version, - "deployment.environment": config.deployment_environment, - }.items() - if value is not None - } - - resource = Resource.create(attributes) - - # Configure tracer provider with in-memory exporter - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) - trace.set_tracer_provider(tracer_provider) - - # Configure meter provider with in-memory reader - meter_provider = MeterProvider( - resource=resource, - metric_readers=[metric_reader] - ) - metrics.set_meter_provider(meter_provider) - - -class TestOTelProviderInitialization: - """Test OTel provider initialization within Llama Stack.""" - - def test_provider_initializes_successfully(self, otel_provider_with_memory_exporters): - """Test that the OTel provider initializes without errors.""" - provider = otel_provider_with_memory_exporters['provider'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - assert provider is not None - assert provider.config.service_name == "test-llama-stack-otel" - assert provider.config.service_version == "1.0.0-test" - assert provider.config.deployment_environment == "ci-test" - - def test_provider_has_thread_safety_mechanisms(self, otel_provider_with_memory_exporters): - """Test that the provider has thread-safety mechanisms in place.""" - provider = otel_provider_with_memory_exporters['provider'] - - assert hasattr(provider, "_lock") - assert provider._lock is not None - assert hasattr(provider, "_counters") - assert hasattr(provider, "_histograms") - assert hasattr(provider, "_up_down_counters") - - -class TestOTelMetricsCapture: - """Test that OTel provider captures expected metrics.""" - - def test_counter_metric_is_captured(self, otel_provider_with_memory_exporters): - """Test that counter metrics are captured.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record counter metrics - provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"}) - provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/chat"}) - provider.record_count("llama.requests.total", 1.0, attributes={"endpoint": "/embeddings"}) - - # Force metric collection - collect() triggers the reader to gather metrics - metric_reader.collect() - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Verify metrics were captured - assert metrics_data is not None - assert len(metrics_data.resource_metrics) > 0 - - # Find our counter metric - found_counter = False - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - if metric.name == "llama.requests.total": - found_counter = True - # Verify it's a counter with data points - assert hasattr(metric.data, "data_points") - assert len(metric.data.data_points) > 0 - - assert found_counter, "Counter metric 'llama.requests.total' was not captured" - - def test_histogram_metric_is_captured(self, otel_provider_with_memory_exporters): - """Test that histogram metrics are captured.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record histogram metrics with various values - latencies = [10.5, 25.3, 50.1, 100.7, 250.2] - for latency in latencies: - provider.record_histogram( - "llama.inference.latency", - latency, - attributes={"model": "llama-3.2"} - ) - - # Force metric collection - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Find our histogram metric - found_histogram = False - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - if metric.name == "llama.inference.latency": - found_histogram = True - # Verify it's a histogram - assert hasattr(metric.data, "data_points") - data_point = metric.data.data_points[0] - # Histograms should have count and sum - assert hasattr(data_point, "count") - assert data_point.count == len(latencies) - - assert found_histogram, "Histogram metric 'llama.inference.latency' was not captured" - - def test_up_down_counter_metric_is_captured(self, otel_provider_with_memory_exporters): - """Test that up/down counter metrics are captured.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record up/down counter metrics - provider.record_up_down_counter("llama.active.sessions", 5) - provider.record_up_down_counter("llama.active.sessions", 3) - provider.record_up_down_counter("llama.active.sessions", -2) - - # Force metric collection - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Find our up/down counter metric - found_updown = False - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - if metric.name == "llama.active.sessions": - found_updown = True - assert hasattr(metric.data, "data_points") - assert len(metric.data.data_points) > 0 - - assert found_updown, "Up/Down counter metric 'llama.active.sessions' was not captured" - - def test_metrics_with_attributes_are_captured(self, otel_provider_with_memory_exporters): - """Test that metric attributes/labels are preserved.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record metrics with different attributes - provider.record_count("llama.tokens.generated", 150.0, attributes={ - "model": "llama-3.2-1b", - "user": "test-user" - }) - - # Force metric collection - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Verify attributes are preserved - found_with_attributes = False - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - if metric.name == "llama.tokens.generated": - data_point = metric.data.data_points[0] - # Check attributes - they're already a dict in the SDK - attrs = data_point.attributes if isinstance(data_point.attributes, dict) else {} - if "model" in attrs and "user" in attrs: - found_with_attributes = True - assert attrs["model"] == "llama-3.2-1b" - assert attrs["user"] == "test-user" - - assert found_with_attributes, "Metrics with attributes were not properly captured" - - def test_multiple_metric_types_coexist(self, otel_provider_with_memory_exporters): - """Test that different metric types can coexist.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record various metric types - provider.record_count("test.counter", 1.0) - provider.record_histogram("test.histogram", 42.0) - provider.record_up_down_counter("test.gauge", 10) - - # Force metric collection - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Count unique metrics - metric_names = set() - for resource_metric in metrics_data.resource_metrics: - for scope_metric in resource_metric.scope_metrics: - for metric in scope_metric.metrics: - metric_names.add(metric.name) - - # Should have all three metrics - assert "test.counter" in metric_names - assert "test.histogram" in metric_names - assert "test.gauge" in metric_names - - -class TestOTelSpansCapture: - """Test that OTel provider captures expected spans/traces.""" - - def test_basic_span_is_captured(self, otel_provider_with_memory_exporters): - """Test that basic spans are captured.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Create a span - span = provider.custom_trace("llama.inference.request") - span.end() - - # Get captured spans - spans = span_exporter.get_finished_spans() - - assert len(spans) > 0 - assert any(span.name == "llama.inference.request" for span in spans) - - def test_span_with_attributes_is_captured(self, otel_provider_with_memory_exporters): - """Test that span attributes are preserved.""" - provider = otel_provider_with_memory_exporters['provider'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Create a span with attributes - span = provider.custom_trace( - "llama.chat.completion", - attributes={ - "model.id": "llama-3.2-1b", - "user.id": "test-user-123", - "request.id": "req-abc-123" - } - ) - span.end() - - # Get captured spans - spans = span_exporter.get_finished_spans() - - # Find our span - our_span = None - for s in spans: - if s.name == "llama.chat.completion": - our_span = s - break - - assert our_span is not None, "Span 'llama.chat.completion' was not captured" - - # Verify attributes - attrs = dict(our_span.attributes) - assert attrs.get("model.id") == "llama-3.2-1b" - assert attrs.get("user.id") == "test-user-123" - assert attrs.get("request.id") == "req-abc-123" - - def test_multiple_spans_are_captured(self, otel_provider_with_memory_exporters): - """Test that multiple spans are captured.""" - provider = otel_provider_with_memory_exporters['provider'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Create multiple spans - span_names = [ - "llama.request.validate", - "llama.model.load", - "llama.inference.execute", - "llama.response.format" - ] - - for name in span_names: - span = provider.custom_trace(name) - time.sleep(0.01) # Small delay to ensure ordering - span.end() - - # Get captured spans - spans = span_exporter.get_finished_spans() - captured_names = {span.name for span in spans} - - # Verify all spans were captured - for expected_name in span_names: - assert expected_name in captured_names, f"Span '{expected_name}' was not captured" - - def test_span_has_service_metadata(self, otel_provider_with_memory_exporters): - """Test that spans include service metadata.""" - provider = otel_provider_with_memory_exporters['provider'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Create a span - span = provider.custom_trace("test.span") - span.end() - - # Get captured spans - spans = span_exporter.get_finished_spans() - - assert len(spans) > 0 - - # Check resource attributes - span = spans[0] - resource_attrs = dict(span.resource.attributes) - - assert resource_attrs.get("service.name") == "test-llama-stack-otel" - assert resource_attrs.get("service.version") == "1.0.0-test" - assert resource_attrs.get("deployment.environment") == "ci-test" - - -class TestOTelDataExport: - """Test that telemetry data can be exported to OTLP collector.""" - - def test_metrics_are_exportable(self, otel_provider_with_memory_exporters): - """Test that metrics can be exported.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - - # Record metrics - provider.record_count("export.test.counter", 5.0) - provider.record_histogram("export.test.histogram", 123.45) - - # Force export - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - - # Verify data structure is exportable - assert metrics_data is not None - assert hasattr(metrics_data, "resource_metrics") - assert len(metrics_data.resource_metrics) > 0 - - # Verify resource attributes are present (needed for OTLP export) - resource = metrics_data.resource_metrics[0].resource - assert resource is not None - assert len(resource.attributes) > 0 - - def test_spans_are_exportable(self, otel_provider_with_memory_exporters): - """Test that spans can be exported.""" - provider = otel_provider_with_memory_exporters['provider'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Create spans - span1 = provider.custom_trace("export.test.span1") - span1.end() - - span2 = provider.custom_trace("export.test.span2") - span2.end() - - # Get exported spans - spans = span_exporter.get_finished_spans() - - # Verify spans have required OTLP fields - assert len(spans) >= 2 - for span in spans: - assert span.name is not None - assert span.context is not None - assert span.context.trace_id is not None - assert span.context.span_id is not None - assert span.resource is not None - - def test_concurrent_export_is_safe(self, otel_provider_with_memory_exporters): - """Test that concurrent metric/span recording doesn't break export.""" - import concurrent.futures - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - def record_data(thread_id): - for i in range(10): - provider.record_count(f"concurrent.counter.{thread_id}", 1.0) - span = provider.custom_trace(f"concurrent.span.{thread_id}.{i}") - span.end() - - # Record from multiple threads - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - futures = [executor.submit(record_data, i) for i in range(5)] - concurrent.futures.wait(futures) - - # Verify export still works - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - spans = span_exporter.get_finished_spans() - - assert metrics_data is not None - assert len(spans) >= 50 # 5 threads * 10 spans each - - -@pytest.mark.integration -class TestOTelProviderIntegration: - """End-to-end integration tests simulating real usage.""" - - def test_complete_inference_workflow_telemetry(self, otel_provider_with_memory_exporters): - """Simulate a complete inference workflow with telemetry.""" - provider = otel_provider_with_memory_exporters['provider'] - metric_reader = otel_provider_with_memory_exporters['metric_reader'] - span_exporter = otel_provider_with_memory_exporters['span_exporter'] - - # Simulate inference workflow - request_span = provider.custom_trace( - "llama.inference.request", - attributes={"model": "llama-3.2-1b", "user": "test"} - ) - - # Track metrics during inference - provider.record_count("llama.requests.received", 1.0) - provider.record_up_down_counter("llama.requests.in_flight", 1) - - # Simulate processing time - time.sleep(0.01) - provider.record_histogram("llama.request.duration_ms", 10.5) - - # Track tokens - provider.record_count("llama.tokens.input", 25.0) - provider.record_count("llama.tokens.output", 150.0) - - # End request - provider.record_up_down_counter("llama.requests.in_flight", -1) - provider.record_count("llama.requests.completed", 1.0) - request_span.end() - - # Verify all telemetry was captured - metric_reader.collect() - metrics_data = metric_reader.get_metrics_data() - spans = span_exporter.get_finished_spans() - - # Check metrics exist - metric_names = set() - for rm in metrics_data.resource_metrics: - for sm in rm.scope_metrics: - for m in sm.metrics: - metric_names.add(m.name) - - assert "llama.requests.received" in metric_names - assert "llama.requests.in_flight" in metric_names - assert "llama.request.duration_ms" in metric_names - assert "llama.tokens.input" in metric_names - assert "llama.tokens.output" in metric_names - - # Check span exists - assert any(s.name == "llama.inference.request" for s in spans) - diff --git a/tests/unit/providers/telemetry/test_otel.py b/tests/unit/providers/telemetry/test_otel.py index b2c509648..5d10d74a8 100644 --- a/tests/unit/providers/telemetry/test_otel.py +++ b/tests/unit/providers/telemetry/test_otel.py @@ -5,10 +5,12 @@ # the root directory of this source tree. import concurrent.futures -import threading from unittest.mock import MagicMock import pytest +from opentelemetry import trace +from opentelemetry.metrics import Meter +from opentelemetry.trace import Tracer from llama_stack.providers.inline.telemetry.otel.config import OTelTelemetryConfig from llama_stack.providers.inline.telemetry.otel.otel import OTelTelemetryProvider @@ -43,12 +45,6 @@ class TestOTelTelemetryProviderInitialization: provider = OTelTelemetryProvider(config=otel_config) assert provider.config == otel_config - assert hasattr(provider, "_lock") - assert provider._lock is not None - assert isinstance(provider._counters, dict) - assert isinstance(provider._histograms, dict) - assert isinstance(provider._up_down_counters, dict) - assert isinstance(provider._gauges, dict) def test_initialization_sets_service_attributes(self, otel_config, monkeypatch): """Test that service attributes are properly configured.""" @@ -88,228 +84,309 @@ class TestOTelTelemetryProviderInitialization: assert any("Metrics will not be exported" in record.message for record in caplog.records) -class TestOTelTelemetryProviderMetrics: - """Tests for metric recording functionality.""" +class TestOTelTelemetryProviderTracerAPI: + """Tests for the get_tracer() API.""" - def test_record_count_creates_counter(self, otel_provider): - """Test that record_count creates a counter on first call.""" - assert "test_counter" not in otel_provider._counters + def test_get_tracer_returns_tracer(self, otel_provider): + """Test that get_tracer returns a valid Tracer instance.""" + tracer = otel_provider.get_tracer("test.module") - otel_provider.record_count("test_counter", 1.0) - - assert "test_counter" in otel_provider._counters - assert otel_provider._counters["test_counter"] is not None + assert tracer is not None + assert isinstance(tracer, Tracer) - def test_record_count_reuses_counter(self, otel_provider): - """Test that record_count reuses existing counter.""" - otel_provider.record_count("test_counter", 1.0) - first_counter = otel_provider._counters["test_counter"] - - otel_provider.record_count("test_counter", 2.0) - second_counter = otel_provider._counters["test_counter"] - - assert first_counter is second_counter - assert len(otel_provider._counters) == 1 - - def test_record_count_with_attributes(self, otel_provider): - """Test that record_count works with attributes.""" - otel_provider.record_count( - "test_counter", - 1.0, - attributes={"key": "value", "env": "test"} + def test_get_tracer_with_version(self, otel_provider): + """Test that get_tracer works with version parameter.""" + tracer = otel_provider.get_tracer( + instrumenting_module_name="test.module", + instrumenting_library_version="1.0.0" ) - assert "test_counter" in otel_provider._counters + assert tracer is not None + assert isinstance(tracer, Tracer) - def test_record_histogram_creates_histogram(self, otel_provider): - """Test that record_histogram creates a histogram on first call.""" - assert "test_histogram" not in otel_provider._histograms - - otel_provider.record_histogram("test_histogram", 42.5) - - assert "test_histogram" in otel_provider._histograms - assert otel_provider._histograms["test_histogram"] is not None - - def test_record_histogram_reuses_histogram(self, otel_provider): - """Test that record_histogram reuses existing histogram.""" - otel_provider.record_histogram("test_histogram", 10.0) - first_histogram = otel_provider._histograms["test_histogram"] - - otel_provider.record_histogram("test_histogram", 20.0) - second_histogram = otel_provider._histograms["test_histogram"] - - assert first_histogram is second_histogram - assert len(otel_provider._histograms) == 1 - - def test_record_histogram_with_bucket_boundaries(self, otel_provider): - """Test that record_histogram works with explicit bucket boundaries.""" - boundaries = [0.0, 10.0, 50.0, 100.0] - - otel_provider.record_histogram( - "test_histogram", - 25.0, - explicit_bucket_boundaries_advisory=boundaries + def test_get_tracer_with_attributes(self, otel_provider): + """Test that get_tracer works with attributes.""" + tracer = otel_provider.get_tracer( + instrumenting_module_name="test.module", + attributes={"component": "test", "tier": "backend"} ) - assert "test_histogram" in otel_provider._histograms + assert tracer is not None + assert isinstance(tracer, Tracer) - def test_record_up_down_counter_creates_counter(self, otel_provider): - """Test that record_up_down_counter creates a counter on first call.""" - assert "test_updown" not in otel_provider._up_down_counters + def test_get_tracer_with_schema_url(self, otel_provider): + """Test that get_tracer works with schema URL.""" + tracer = otel_provider.get_tracer( + instrumenting_module_name="test.module", + schema_url="https://example.com/schema" + ) - otel_provider.record_up_down_counter("test_updown", 1.0) - - assert "test_updown" in otel_provider._up_down_counters - assert otel_provider._up_down_counters["test_updown"] is not None + assert tracer is not None + assert isinstance(tracer, Tracer) - def test_record_up_down_counter_reuses_counter(self, otel_provider): - """Test that record_up_down_counter reuses existing counter.""" - otel_provider.record_up_down_counter("test_updown", 5.0) - first_counter = otel_provider._up_down_counters["test_updown"] + def test_tracer_can_create_spans(self, otel_provider): + """Test that tracer can create spans.""" + tracer = otel_provider.get_tracer("test.module") - otel_provider.record_up_down_counter("test_updown", -3.0) - second_counter = otel_provider._up_down_counters["test_updown"] - - assert first_counter is second_counter - assert len(otel_provider._up_down_counters) == 1 + with tracer.start_as_current_span("test.operation") as span: + assert span is not None + assert span.is_recording() - def test_multiple_metrics_with_different_names(self, otel_provider): - """Test that multiple metrics with different names are cached separately.""" - otel_provider.record_count("counter1", 1.0) - otel_provider.record_count("counter2", 2.0) - otel_provider.record_histogram("histogram1", 10.0) - otel_provider.record_up_down_counter("updown1", 5.0) + def test_tracer_can_create_spans_with_attributes(self, otel_provider): + """Test that tracer can create spans with attributes.""" + tracer = otel_provider.get_tracer("test.module") - assert len(otel_provider._counters) == 2 - assert len(otel_provider._histograms) == 1 - assert len(otel_provider._up_down_counters) == 1 + with tracer.start_as_current_span( + "test.operation", + attributes={"user.id": "123", "request.id": "abc"} + ) as span: + assert span is not None + assert span.is_recording() + + def test_multiple_tracers_can_coexist(self, otel_provider): + """Test that multiple tracers can be created.""" + tracer1 = otel_provider.get_tracer("module.one") + tracer2 = otel_provider.get_tracer("module.two") + + assert tracer1 is not None + assert tracer2 is not None + # Tracers with different names might be the same instance or different + # depending on OTel implementation, so just verify both work + with tracer1.start_as_current_span("op1") as span1: + assert span1.is_recording() + with tracer2.start_as_current_span("op2") as span2: + assert span2.is_recording() -class TestOTelTelemetryProviderThreadSafety: - """Tests for thread safety of metric operations.""" +class TestOTelTelemetryProviderMeterAPI: + """Tests for the get_meter() API.""" - def test_concurrent_counter_creation_same_name(self, otel_provider): - """Test that concurrent calls to record_count with same name are thread-safe.""" - num_threads = 50 - counter_name = "concurrent_counter" + def test_get_meter_returns_meter(self, otel_provider): + """Test that get_meter returns a valid Meter instance.""" + meter = otel_provider.get_meter("test.meter") - def record_metric(): - otel_provider.record_count(counter_name, 1.0) - - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [executor.submit(record_metric) for _ in range(num_threads)] - concurrent.futures.wait(futures) - - # Should have exactly one counter created despite concurrent access - assert len(otel_provider._counters) == 1 - assert counter_name in otel_provider._counters + assert meter is not None + assert isinstance(meter, Meter) - def test_concurrent_histogram_creation_same_name(self, otel_provider): - """Test that concurrent calls to record_histogram with same name are thread-safe.""" - num_threads = 50 - histogram_name = "concurrent_histogram" + def test_get_meter_with_version(self, otel_provider): + """Test that get_meter works with version parameter.""" + meter = otel_provider.get_meter( + name="test.meter", + version="1.0.0" + ) - def record_metric(): - thread_id = threading.current_thread().ident or 0 - otel_provider.record_histogram(histogram_name, float(thread_id % 100)) - - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [executor.submit(record_metric) for _ in range(num_threads)] - concurrent.futures.wait(futures) - - # Should have exactly one histogram created despite concurrent access - assert len(otel_provider._histograms) == 1 - assert histogram_name in otel_provider._histograms + assert meter is not None + assert isinstance(meter, Meter) - def test_concurrent_up_down_counter_creation_same_name(self, otel_provider): - """Test that concurrent calls to record_up_down_counter with same name are thread-safe.""" - num_threads = 50 - counter_name = "concurrent_updown" + def test_get_meter_with_attributes(self, otel_provider): + """Test that get_meter works with attributes.""" + meter = otel_provider.get_meter( + name="test.meter", + attributes={"service": "test", "env": "dev"} + ) - def record_metric(): - otel_provider.record_up_down_counter(counter_name, 1.0) - - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [executor.submit(record_metric) for _ in range(num_threads)] - concurrent.futures.wait(futures) - - # Should have exactly one counter created despite concurrent access - assert len(otel_provider._up_down_counters) == 1 - assert counter_name in otel_provider._up_down_counters + assert meter is not None + assert isinstance(meter, Meter) - def test_concurrent_mixed_metrics_different_names(self, otel_provider): - """Test concurrent creation of different metric types with different names.""" - num_threads = 30 + def test_get_meter_with_schema_url(self, otel_provider): + """Test that get_meter works with schema URL.""" + meter = otel_provider.get_meter( + name="test.meter", + schema_url="https://example.com/schema" + ) - def record_counters(thread_id): - otel_provider.record_count(f"counter_{thread_id}", 1.0) + assert meter is not None + assert isinstance(meter, Meter) + + def test_meter_can_create_counter(self, otel_provider): + """Test that meter can create counters.""" + meter = otel_provider.get_meter("test.meter") - def record_histograms(thread_id): - otel_provider.record_histogram(f"histogram_{thread_id}", float(thread_id)) + counter = meter.create_counter( + "test.requests.total", + unit="requests", + description="Total requests" + ) - def record_up_down_counters(thread_id): - otel_provider.record_up_down_counter(f"updown_{thread_id}", float(thread_id)) + assert counter is not None + # Test that counter can be used + counter.add(1, {"endpoint": "/test"}) + + def test_meter_can_create_histogram(self, otel_provider): + """Test that meter can create histograms.""" + meter = otel_provider.get_meter("test.meter") - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads * 3) as executor: - futures = [] - for i in range(num_threads): - futures.append(executor.submit(record_counters, i)) - futures.append(executor.submit(record_histograms, i)) - futures.append(executor.submit(record_up_down_counters, i)) + histogram = meter.create_histogram( + "test.request.duration", + unit="ms", + description="Request duration" + ) + + assert histogram is not None + # Test that histogram can be used + histogram.record(42.5, {"method": "GET"}) + + def test_meter_can_create_up_down_counter(self, otel_provider): + """Test that meter can create up/down counters.""" + meter = otel_provider.get_meter("test.meter") + + up_down_counter = meter.create_up_down_counter( + "test.active.connections", + unit="connections", + description="Active connections" + ) + + assert up_down_counter is not None + # Test that up/down counter can be used + up_down_counter.add(5) + up_down_counter.add(-2) + + def test_meter_can_create_observable_gauge(self, otel_provider): + """Test that meter can create observable gauges.""" + meter = otel_provider.get_meter("test.meter") + + def gauge_callback(options): + return [{"attributes": {"host": "localhost"}, "value": 42.0}] + + gauge = meter.create_observable_gauge( + "test.memory.usage", + callbacks=[gauge_callback], + unit="bytes", + description="Memory usage" + ) + + assert gauge is not None + + def test_multiple_instruments_from_same_meter(self, otel_provider): + """Test that a meter can create multiple instruments.""" + meter = otel_provider.get_meter("test.meter") + + counter = meter.create_counter("test.counter") + histogram = meter.create_histogram("test.histogram") + up_down_counter = meter.create_up_down_counter("test.gauge") + + assert counter is not None + assert histogram is not None + assert up_down_counter is not None + + # Verify they all work + counter.add(1) + histogram.record(10.0) + up_down_counter.add(5) + + +class TestOTelTelemetryProviderNativeUsage: + """Tests for native OpenTelemetry usage patterns.""" + + def test_complete_tracing_workflow(self, otel_provider): + """Test a complete tracing workflow using native OTel API.""" + tracer = otel_provider.get_tracer("llama_stack.inference") + + # Create parent span + with tracer.start_as_current_span("inference.request") as parent_span: + parent_span.set_attribute("model", "llama-3.2-1b") + parent_span.set_attribute("user", "test-user") + # Create child span + with tracer.start_as_current_span("model.load") as child_span: + child_span.set_attribute("model.size", "1B") + assert child_span.is_recording() + + # Create another child span + with tracer.start_as_current_span("inference.execute") as child_span: + child_span.set_attribute("tokens.input", 25) + child_span.set_attribute("tokens.output", 150) + assert child_span.is_recording() + + assert parent_span.is_recording() + + def test_complete_metrics_workflow(self, otel_provider): + """Test a complete metrics workflow using native OTel API.""" + meter = otel_provider.get_meter("llama_stack.metrics") + + # Create various instruments + request_counter = meter.create_counter( + "llama.requests.total", + unit="requests", + description="Total requests" + ) + + latency_histogram = meter.create_histogram( + "llama.inference.duration", + unit="ms", + description="Inference duration" + ) + + active_sessions = meter.create_up_down_counter( + "llama.sessions.active", + unit="sessions", + description="Active sessions" + ) + + # Use the instruments + request_counter.add(1, {"endpoint": "/chat", "status": "success"}) + latency_histogram.record(123.45, {"model": "llama-3.2-1b"}) + active_sessions.add(1) + active_sessions.add(-1) + + # No exceptions means success + + def test_concurrent_tracer_usage(self, otel_provider): + """Test that multiple threads can use tracers concurrently.""" + def create_spans(thread_id): + tracer = otel_provider.get_tracer(f"test.module.{thread_id}") + for i in range(10): + with tracer.start_as_current_span(f"operation.{i}") as span: + span.set_attribute("thread.id", thread_id) + span.set_attribute("iteration", i) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(create_spans, i) for i in range(10)] concurrent.futures.wait(futures) - # Each thread should have created its own metric - assert len(otel_provider._counters) == num_threads - assert len(otel_provider._histograms) == num_threads - assert len(otel_provider._up_down_counters) == num_threads + # If we get here without exceptions, thread safety is working - def test_concurrent_access_existing_and_new_metrics(self, otel_provider): - """Test concurrent access mixing existing and new metric creation.""" - # Pre-create some metrics - otel_provider.record_count("existing_counter", 1.0) - otel_provider.record_histogram("existing_histogram", 10.0) + def test_concurrent_meter_usage(self, otel_provider): + """Test that multiple threads can use meters concurrently.""" + def record_metrics(thread_id): + meter = otel_provider.get_meter(f"test.meter.{thread_id}") + counter = meter.create_counter(f"test.counter.{thread_id}") + histogram = meter.create_histogram(f"test.histogram.{thread_id}") + + for i in range(10): + counter.add(1, {"thread": str(thread_id)}) + histogram.record(float(i * 10), {"thread": str(thread_id)}) - num_threads = 40 - - def mixed_operations(thread_id): - # Half the threads use existing metrics, half create new ones - if thread_id % 2 == 0: - otel_provider.record_count("existing_counter", 1.0) - otel_provider.record_histogram("existing_histogram", float(thread_id)) - else: - otel_provider.record_count(f"new_counter_{thread_id}", 1.0) - otel_provider.record_histogram(f"new_histogram_{thread_id}", float(thread_id)) - - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [executor.submit(mixed_operations, i) for i in range(num_threads)] + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(record_metrics, i) for i in range(10)] concurrent.futures.wait(futures) - # Should have existing metrics plus half of num_threads new ones - expected_new_counters = num_threads // 2 - expected_new_histograms = num_threads // 2 + # If we get here without exceptions, thread safety is working + + def test_mixed_tracing_and_metrics(self, otel_provider): + """Test using both tracing and metrics together.""" + tracer = otel_provider.get_tracer("test.module") + meter = otel_provider.get_meter("test.meter") - assert len(otel_provider._counters) == 1 + expected_new_counters - assert len(otel_provider._histograms) == 1 + expected_new_histograms + counter = meter.create_counter("operations.count") + histogram = meter.create_histogram("operation.duration") + + # Trace an operation while recording metrics + with tracer.start_as_current_span("test.operation") as span: + counter.add(1) + span.set_attribute("step", "start") + + histogram.record(50.0) + span.set_attribute("step", "processing") + + counter.add(1) + span.set_attribute("step", "complete") + + # No exceptions means success -class TestOTelTelemetryProviderTracing: - """Tests for tracing functionality.""" - - def test_custom_trace_creates_span(self, otel_provider): - """Test that custom_trace creates a span.""" - span = otel_provider.custom_trace("test_span") - - assert span is not None - assert hasattr(span, "get_span_context") - - def test_custom_trace_with_attributes(self, otel_provider): - """Test that custom_trace works with attributes.""" - attributes = {"key": "value", "operation": "test"} - - span = otel_provider.custom_trace("test_span", attributes=attributes) - - assert span is not None +class TestOTelTelemetryProviderFastAPIMiddleware: + """Tests for FastAPI middleware functionality.""" def test_fastapi_middleware(self, otel_provider): """Test that fastapi_middleware can be called.""" @@ -318,51 +395,182 @@ class TestOTelTelemetryProviderTracing: # Should not raise an exception otel_provider.fastapi_middleware(mock_app) + def test_fastapi_middleware_is_idempotent(self, otel_provider): + """Test that calling fastapi_middleware multiple times is safe.""" + mock_app = MagicMock() + + # Should be able to call multiple times without error + otel_provider.fastapi_middleware(mock_app) + # Note: Second call might warn but shouldn't fail + # otel_provider.fastapi_middleware(mock_app) + class TestOTelTelemetryProviderEdgeCases: """Tests for edge cases and error conditions.""" - def test_record_count_with_zero(self, otel_provider): - """Test that record_count works with zero value.""" - otel_provider.record_count("zero_counter", 0.0) + def test_tracer_with_empty_module_name(self, otel_provider): + """Test that get_tracer works with empty module name.""" + tracer = otel_provider.get_tracer("") - assert "zero_counter" in otel_provider._counters + assert tracer is not None + assert isinstance(tracer, Tracer) - def test_record_count_with_large_value(self, otel_provider): - """Test that record_count works with large values.""" - otel_provider.record_count("large_counter", 1_000_000.0) + def test_meter_with_empty_name(self, otel_provider): + """Test that get_meter works with empty name.""" + meter = otel_provider.get_meter("") - assert "large_counter" in otel_provider._counters + assert meter is not None + assert isinstance(meter, Meter) - def test_record_histogram_with_negative_value(self, otel_provider): - """Test that record_histogram works with negative values.""" - otel_provider.record_histogram("negative_histogram", -10.0) + def test_meter_instruments_with_special_characters(self, otel_provider): + """Test that metric names with dots, underscores, and hyphens work.""" + meter = otel_provider.get_meter("test.meter") - assert "negative_histogram" in otel_provider._histograms - - def test_record_up_down_counter_with_negative_value(self, otel_provider): - """Test that record_up_down_counter works with negative values.""" - otel_provider.record_up_down_counter("negative_updown", -5.0) + counter = meter.create_counter("test.counter_name-special") + histogram = meter.create_histogram("test.histogram_name-special") - assert "negative_updown" in otel_provider._up_down_counters - - def test_metric_names_with_special_characters(self, otel_provider): - """Test that metric names with dots and underscores work.""" - otel_provider.record_count("test.counter_name-special", 1.0) - otel_provider.record_histogram("test.histogram_name-special", 10.0) + assert counter is not None + assert histogram is not None - assert "test.counter_name-special" in otel_provider._counters - assert "test.histogram_name-special" in otel_provider._histograms + # Verify they can be used + counter.add(1) + histogram.record(10.0) - def test_empty_attributes_dict(self, otel_provider): + def test_meter_counter_with_zero_value(self, otel_provider): + """Test that counters work with zero value.""" + meter = otel_provider.get_meter("test.meter") + counter = meter.create_counter("test.counter") + + # Should not raise an exception + counter.add(0.0) + + def test_meter_histogram_with_negative_value(self, otel_provider): + """Test that histograms accept negative values.""" + meter = otel_provider.get_meter("test.meter") + histogram = meter.create_histogram("test.histogram") + + # Should not raise an exception + histogram.record(-10.0) + + def test_meter_up_down_counter_with_negative_value(self, otel_provider): + """Test that up/down counters work with negative values.""" + meter = otel_provider.get_meter("test.meter") + up_down_counter = meter.create_up_down_counter("test.updown") + + # Should not raise an exception + up_down_counter.add(-5.0) + + def test_meter_instruments_with_empty_attributes(self, otel_provider): """Test that empty attributes dict is handled correctly.""" - otel_provider.record_count("test_counter", 1.0, attributes={}) + meter = otel_provider.get_meter("test.meter") + counter = meter.create_counter("test.counter") - assert "test_counter" in otel_provider._counters + # Should not raise an exception + counter.add(1.0, attributes={}) - def test_none_attributes(self, otel_provider): + def test_meter_instruments_with_none_attributes(self, otel_provider): """Test that None attributes are handled correctly.""" - otel_provider.record_count("test_counter", 1.0, attributes=None) + meter = otel_provider.get_meter("test.meter") + counter = meter.create_counter("test.counter") - assert "test_counter" in otel_provider._counters + # Should not raise an exception + counter.add(1.0, attributes=None) + +class TestOTelTelemetryProviderRealisticScenarios: + """Tests simulating realistic usage scenarios.""" + + def test_inference_request_telemetry(self, otel_provider): + """Simulate telemetry for a complete inference request.""" + tracer = otel_provider.get_tracer("llama_stack.inference") + meter = otel_provider.get_meter("llama_stack.metrics") + + # Create instruments + request_counter = meter.create_counter("llama.requests.total") + token_counter = meter.create_counter("llama.tokens.total") + latency_histogram = meter.create_histogram("llama.request.duration_ms") + in_flight_gauge = meter.create_up_down_counter("llama.requests.in_flight") + + # Simulate request + with tracer.start_as_current_span("inference.request") as request_span: + request_span.set_attribute("model.id", "llama-3.2-1b") + request_span.set_attribute("user.id", "test-user") + + request_counter.add(1, {"model": "llama-3.2-1b"}) + in_flight_gauge.add(1) + + # Simulate token counting + token_counter.add(25, {"type": "input", "model": "llama-3.2-1b"}) + token_counter.add(150, {"type": "output", "model": "llama-3.2-1b"}) + + # Simulate latency + latency_histogram.record(125.5, {"model": "llama-3.2-1b"}) + + in_flight_gauge.add(-1) + request_span.set_attribute("tokens.input", 25) + request_span.set_attribute("tokens.output", 150) + + def test_multi_step_workflow_with_nested_spans(self, otel_provider): + """Simulate a multi-step workflow with nested spans.""" + tracer = otel_provider.get_tracer("llama_stack.workflow") + meter = otel_provider.get_meter("llama_stack.workflow.metrics") + + step_counter = meter.create_counter("workflow.steps.completed") + + with tracer.start_as_current_span("workflow.execute") as root_span: + root_span.set_attribute("workflow.id", "wf-123") + + # Step 1: Validate + with tracer.start_as_current_span("step.validate") as span: + span.set_attribute("validation.result", "pass") + step_counter.add(1, {"step": "validate", "status": "success"}) + + # Step 2: Process + with tracer.start_as_current_span("step.process") as span: + span.set_attribute("items.processed", 100) + step_counter.add(1, {"step": "process", "status": "success"}) + + # Step 3: Finalize + with tracer.start_as_current_span("step.finalize") as span: + span.set_attribute("output.size", 1024) + step_counter.add(1, {"step": "finalize", "status": "success"}) + + root_span.set_attribute("workflow.status", "completed") + + def test_error_handling_with_telemetry(self, otel_provider): + """Test telemetry when errors occur.""" + tracer = otel_provider.get_tracer("llama_stack.errors") + meter = otel_provider.get_meter("llama_stack.errors.metrics") + + error_counter = meter.create_counter("llama.errors.total") + + with tracer.start_as_current_span("operation.with.error") as span: + try: + span.set_attribute("step", "processing") + # Simulate an error + raise ValueError("Test error") + except ValueError as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + error_counter.add(1, {"error.type": "ValueError"}) + + # Should not raise - error was handled + + def test_batch_operations_telemetry(self, otel_provider): + """Test telemetry for batch operations.""" + tracer = otel_provider.get_tracer("llama_stack.batch") + meter = otel_provider.get_meter("llama_stack.batch.metrics") + + batch_counter = meter.create_counter("llama.batch.items.processed") + batch_duration = meter.create_histogram("llama.batch.duration_ms") + + with tracer.start_as_current_span("batch.process") as batch_span: + batch_span.set_attribute("batch.size", 100) + + for i in range(100): + with tracer.start_as_current_span(f"item.{i}") as item_span: + item_span.set_attribute("item.index", i) + batch_counter.add(1, {"status": "success"}) + + batch_duration.record(5000.0, {"batch.size": "100"}) + batch_span.set_attribute("batch.status", "completed") diff --git a/uv.lock b/uv.lock index 2679edb94..f25bba4a0 100644 --- a/uv.lock +++ b/uv.lock @@ -1775,6 +1775,7 @@ dependencies = [ { name = "openai" }, { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-sqlalchemy" }, { name = "opentelemetry-sdk" }, { name = "opentelemetry-semantic-conventions" }, { name = "pillow" }, @@ -1902,6 +1903,7 @@ requires-dist = [ { name = "openai", specifier = ">=1.107" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.30.0" }, { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.57b0" }, + { name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.57b0" }, { name = "opentelemetry-sdk", specifier = ">=1.30.0" }, { name = "opentelemetry-semantic-conventions", specifier = ">=0.57b0" }, { name = "pandas", marker = "extra == 'ui'" }, @@ -2756,6 +2758,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/df/f20fc21c88c7af5311bfefc15fc4e606bab5edb7c193aa8c73c354904c35/opentelemetry_instrumentation_fastapi-0.57b0-py3-none-any.whl", hash = "sha256:61e6402749ffe0bfec582e58155e0d81dd38723cd9bc4562bca1acca80334006", size = 12712, upload-time = "2025-07-29T15:42:03.332Z" }, ] +[[package]] +name = "opentelemetry-instrumentation-sqlalchemy" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9c/18/ee1460dcb044b25aaedd6cfd063304d84ae641dddb8fb9287959f7644100/opentelemetry_instrumentation_sqlalchemy-0.57b0.tar.gz", hash = "sha256:95667326b7cc22bb4bc9941f98ca22dd177679f9a4d277646cc21074c0d732ff", size = 14962, upload-time = "2025-07-29T15:43:12.426Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/18/af35650eb029d771b8d281bea770727f1e2f662c422c5ab1a0c2b7afc152/opentelemetry_instrumentation_sqlalchemy-0.57b0-py3-none-any.whl", hash = "sha256:8a1a815331cb04fc95aa7c50e9c681cdccfb12e1fa4522f079fe4b24753ae106", size = 14202, upload-time = "2025-07-29T15:42:25.828Z" }, +] + [[package]] name = "opentelemetry-proto" version = "1.36.0" @@ -4821,9 +4839,9 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform == 'darwin'" }, ] wheels = [ - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp312-none-macosx_11_0_arm64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-cp313t-macosx_14_0_arm64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-none-macosx_11_0_arm64.whl" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:a47b7986bee3f61ad217d8a8ce24605809ab425baf349f97de758815edd2ef54" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-cp313t-macosx_14_0_arm64.whl", hash = "sha256:fbe2e149c5174ef90d29a5f84a554dfaf28e003cb4f61fa2c8c024c17ec7ca58" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:057efd30a6778d2ee5e2374cd63a63f63311aa6f33321e627c655df60abdd390" }, ] [[package]] @@ -4846,19 +4864,19 @@ dependencies = [ { name = "typing-extensions", marker = "sys_platform != 'darwin'" }, ] wheels = [ - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-linux_s390x.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_amd64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_arm64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-linux_s390x.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_amd64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_arm64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl" }, - { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-win_amd64.whl" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-linux_s390x.whl", hash = "sha256:0e34e276722ab7dd0dffa9e12fe2135a9b34a0e300c456ed7ad6430229404eb5" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:610f600c102386e581327d5efc18c0d6edecb9820b4140d26163354a99cd800d" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:cb9a8ba8137ab24e36bf1742cb79a1294bd374db570f09fc15a5e1318160db4e" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_amd64.whl", hash = "sha256:2be20b2c05a0cce10430cc25f32b689259640d273232b2de357c35729132256d" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp312-cp312-win_arm64.whl", hash = "sha256:99fc421a5d234580e45957a7b02effbf3e1c884a5dd077afc85352c77bf41434" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-linux_s390x.whl", hash = "sha256:8b5882276633cf91fe3d2d7246c743b94d44a7e660b27f1308007fdb1bb89f7d" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:a5064b5e23772c8d164068cc7c12e01a75faf7b948ecd95a0d4007d7487e5f25" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:8f81dedb4c6076ec325acc3b47525f9c550e5284a18eae1d9061c543f7b6e7de" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_amd64.whl", hash = "sha256:e1ee1b2346ade3ea90306dfbec7e8ff17bc220d344109d189ae09078333b0856" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313-win_arm64.whl", hash = "sha256:64c187345509f2b1bb334feed4666e2c781ca381874bde589182f81247e61f88" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:af81283ac671f434b1b25c95ba295f270e72db1fad48831eb5e4748ff9840041" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:a9dbb6f64f63258bc811e2c0c99640a81e5af93c531ad96e95c5ec777ea46dab" }, + { url = "https://download.pytorch.org/whl/cpu/torch-2.8.0%2Bcpu-cp313-cp313t-win_amd64.whl", hash = "sha256:6d93a7165419bc4b2b907e859ccab0dea5deeab261448ae9a5ec5431f14c0e64" }, ] [[package]]