From 8e29d0eb796cc3a9b1c7a14b2d24c72e9edb046d Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Tue, 7 Oct 2025 17:25:54 -0400 Subject: [PATCH] rework(telemetry): remove legacy telemetry api --- llama_stack/core/routers/inference.py | 199 +++------ .../utils/telemetry/dataset_mixin.py | 80 ---- .../utils/telemetry/sqlite_trace_store.py | 383 ------------------ .../providers/utils/telemetry/tracing.py | 383 ------------------ .../utils => }/telemetry/__init__.py | 0 llama_stack/telemetry/middleware.py | 120 ++++++ llama_stack/telemetry/otel.py | 99 +++++ pyproject.toml | 19 +- ...e8083caf34f49147ad1c25efae1de3f0b25e5.json | 56 --- .../telemetry/test_openai_telemetry.py | 194 --------- tests/integration/telemetry/test_telemetry.py | 187 --------- .../telemetry/test_telemetry_metrics.py | 206 ---------- uv.lock | 116 ++++++ 13 files changed, 397 insertions(+), 1645 deletions(-) delete mode 100644 llama_stack/providers/utils/telemetry/dataset_mixin.py delete mode 100644 llama_stack/providers/utils/telemetry/sqlite_trace_store.py delete mode 100644 llama_stack/providers/utils/telemetry/tracing.py rename llama_stack/{providers/utils => }/telemetry/__init__.py (100%) create mode 100644 llama_stack/telemetry/middleware.py create mode 100644 llama_stack/telemetry/otel.py delete mode 100644 tests/integration/telemetry/recordings/0de60cd6a6ec3dbfc4a7601e77be8083caf34f49147ad1c25efae1de3f0b25e5.json delete mode 100644 tests/integration/telemetry/test_openai_telemetry.py delete mode 100644 tests/integration/telemetry/test_telemetry.py delete mode 100644 tests/integration/telemetry/test_telemetry_metrics.py diff --git a/llama_stack/core/routers/inference.py b/llama_stack/core/routers/inference.py index c4338e614..fa52b9467 100644 --- a/llama_stack/core/routers/inference.py +++ b/llama_stack/core/routers/inference.py @@ -7,7 +7,6 @@ import asyncio import time from collections.abc import AsyncGenerator, AsyncIterator -from datetime import UTC, datetime from typing import Annotated, Any from openai.types.chat import ChatCompletionToolChoiceOptionParam as OpenAIChatCompletionToolChoiceOptionParam @@ -45,33 +44,53 @@ from llama_stack.apis.inference import ( ToolPromptFormat, ) from llama_stack.apis.models import Model, ModelType -from llama_stack.apis.telemetry import MetricEvent, MetricInResponse, Telemetry +from llama_stack.apis.telemetry import MetricInResponse from llama_stack.log import get_logger from llama_stack.models.llama.llama3.chat_format import ChatFormat from llama_stack.models.llama.llama3.tokenizer import Tokenizer from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable from llama_stack.providers.utils.inference.inference_store import InferenceStore -from llama_stack.providers.utils.telemetry.tracing import enqueue_event, get_current_span +from opentelemetry import trace, metrics logger = get_logger(name=__name__, category="core::routers") +class InferenceRouterTelemetry: + """Telemetry for InferenceRouter""" + + def __init__(self): + meter = metrics.get_meter(__name__) + self.prompt_tokens = meter.create_counter( + "prompt_tokens", + unit="tokens", + description="Number of tokens in the prompt", + ) + self.completion_tokens = meter.create_counter( + "completion_tokens", + unit="tokens", + description="Number of tokens in the completion", + ) + self.total_tokens = meter.create_counter( + "total_tokens", + unit="tokens", + description="Total number of tokens used", + ) + + class InferenceRouter(Inference): """Routes to an provider based on the model""" def __init__( self, routing_table: RoutingTable, - telemetry: Telemetry | None = None, store: InferenceStore | None = None, ) -> None: logger.debug("Initializing InferenceRouter") self.routing_table = routing_table - self.telemetry = telemetry self.store = store - if self.telemetry: - self.tokenizer = Tokenizer.get_instance() - self.formatter = ChatFormat(self.tokenizer) + self.tokenizer = Tokenizer.get_instance() + self.formatter = ChatFormat(self.tokenizer) + self.telemetry = InferenceRouterTelemetry() async def initialize(self) -> None: logger.debug("InferenceRouter.initialize") @@ -97,64 +116,25 @@ class InferenceRouter(Inference): ) await self.routing_table.register_model(model_id, provider_model_id, provider_id, metadata, model_type) - def _construct_metrics( - self, - prompt_tokens: int, - completion_tokens: int, - total_tokens: int, - model: Model, - ) -> list[MetricEvent]: - """Constructs a list of MetricEvent objects containing token usage metrics. - - Args: - prompt_tokens: Number of tokens in the prompt - completion_tokens: Number of tokens in the completion - total_tokens: Total number of tokens used - model: Model object containing model_id and provider_id - - Returns: - List of MetricEvent objects with token usage metrics - """ - span = get_current_span() - if span is None: - logger.warning("No span found for token usage metrics") - return [] - - metrics = [ - ("prompt_tokens", prompt_tokens), - ("completion_tokens", completion_tokens), - ("total_tokens", total_tokens), - ] - metric_events = [] - for metric_name, value in metrics: - metric_events.append( - MetricEvent( - trace_id=span.trace_id, - span_id=span.span_id, - metric=metric_name, - value=value, - timestamp=datetime.now(UTC), - unit="tokens", - attributes={ - "model_id": model.model_id, - "provider_id": model.provider_id, - }, - ) - ) - return metric_events - - async def _compute_and_log_token_usage( - self, - prompt_tokens: int, - completion_tokens: int, - total_tokens: int, - model: Model, - ) -> list[MetricInResponse]: - metrics = self._construct_metrics(prompt_tokens, completion_tokens, total_tokens, model) + def _record_metrics( + self, prompt_tokens: int | None, completion_tokens: int | None, total_tokens: int | None, model: Model + ): if self.telemetry: - for metric in metrics: - enqueue_event(metric) - return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics] + span = trace.get_current_span() + span_context = span.get_span_context() + attributes = { + "model_id": model.model_id, + "provider_id": model.provider_id, + "trace_id": span_context.trace_id, + "span_id": span_context.span_id, + } + + if prompt_tokens: + self.telemetry.prompt_tokens.add(prompt_tokens, attributes) + if completion_tokens: + self.telemetry.completion_tokens.add(completion_tokens, attributes) + if total_tokens: + self.telemetry.total_tokens.add(total_tokens, attributes) async def _count_tokens( self, @@ -237,19 +217,13 @@ class InferenceRouter(Inference): response = await provider.openai_completion(**params) if self.telemetry: - metrics = self._construct_metrics( + self._record_metrics( prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens, total_tokens=response.usage.total_tokens, model=model_obj, ) - for metric in metrics: - enqueue_event(metric) - # these metrics will show up in the client response. - response.metrics = ( - metrics if not hasattr(response, "metrics") or response.metrics is None else response.metrics + metrics - ) return response async def openai_chat_completion( @@ -343,18 +317,13 @@ class InferenceRouter(Inference): asyncio.create_task(self.store.store_chat_completion(response, messages)) if self.telemetry: - metrics = self._construct_metrics( + self._record_metrics( prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens, total_tokens=response.usage.total_tokens, model=model_obj, ) - for metric in metrics: - enqueue_event(metric) - # these metrics will show up in the client response. - response.metrics = ( - metrics if not hasattr(response, "metrics") or response.metrics is None else response.metrics + metrics - ) + return response async def openai_embeddings( @@ -466,77 +435,15 @@ class InferenceRouter(Inference): # Create a separate span for streaming completion metrics if self.telemetry: # Log metrics in the new span context - completion_metrics = self._construct_metrics( - prompt_tokens=prompt_tokens, + # Only log completion and total tokens + completion_metrics = self._record_metrics( + prompt_tokens=None, completion_tokens=completion_tokens, total_tokens=total_tokens, model=model, ) - for metric in completion_metrics: - if metric.metric in [ - "completion_tokens", - "total_tokens", - ]: # Only log completion and total tokens - enqueue_event(metric) - - # Return metrics in response - async_metrics = [ - MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics - ] - chunk.metrics = async_metrics if chunk.metrics is None else chunk.metrics + async_metrics - else: - # Fallback if no telemetry - completion_metrics = self._construct_metrics( - prompt_tokens or 0, - completion_tokens or 0, - total_tokens, - model, - ) - async_metrics = [ - MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics - ] - chunk.metrics = async_metrics if chunk.metrics is None else chunk.metrics + async_metrics yield chunk - async def count_tokens_and_compute_metrics( - self, - response: ChatCompletionResponse | CompletionResponse, - prompt_tokens, - model, - tool_prompt_format: ToolPromptFormat | None = None, - ): - if isinstance(response, ChatCompletionResponse): - content = [response.completion_message] - else: - content = response.content - completion_tokens = await self._count_tokens(messages=content, tool_prompt_format=tool_prompt_format) - total_tokens = (prompt_tokens or 0) + (completion_tokens or 0) - - # Create a separate span for completion metrics - if self.telemetry: - # Log metrics in the new span context - completion_metrics = self._construct_metrics( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - model=model, - ) - for metric in completion_metrics: - if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens - enqueue_event(metric) - - # Return metrics in response - return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics] - - # Fallback if no telemetry - metrics = self._construct_metrics( - prompt_tokens or 0, - completion_tokens or 0, - total_tokens, - model, - ) - return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics] - async def stream_tokens_and_compute_metrics_openai_chat( self, response: AsyncIterator[OpenAIChatCompletionChunk], @@ -612,14 +519,12 @@ class InferenceRouter(Inference): # Add metrics to the chunk if self.telemetry and chunk.usage: - metrics = self._construct_metrics( + metrics = self._record_metrics( prompt_tokens=chunk.usage.prompt_tokens, completion_tokens=chunk.usage.completion_tokens, total_tokens=chunk.usage.total_tokens, model=model, ) - for metric in metrics: - enqueue_event(metric) yield chunk finally: diff --git a/llama_stack/providers/utils/telemetry/dataset_mixin.py b/llama_stack/providers/utils/telemetry/dataset_mixin.py deleted file mode 100644 index fe729a244..000000000 --- a/llama_stack/providers/utils/telemetry/dataset_mixin.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - - -from llama_stack.apis.datasetio import DatasetIO -from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span - - -class TelemetryDatasetMixin: - """Mixin class that provides dataset-related functionality for telemetry providers.""" - - datasetio_api: DatasetIO | None - - async def save_spans_to_dataset( - self, - attribute_filters: list[QueryCondition], - attributes_to_save: list[str], - dataset_id: str, - max_depth: int | None = None, - ) -> None: - if self.datasetio_api is None: - raise RuntimeError("DatasetIO API not available") - - spans = await self.query_spans( - attribute_filters=attribute_filters, - attributes_to_return=attributes_to_save, - max_depth=max_depth, - ) - - rows = [ - { - "trace_id": span.trace_id, - "span_id": span.span_id, - "parent_span_id": span.parent_span_id, - "name": span.name, - "start_time": span.start_time, - "end_time": span.end_time, - **{attr: span.attributes.get(attr) for attr in attributes_to_save}, - } - for span in spans - ] - - await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows) - - async def query_spans( - self, - attribute_filters: list[QueryCondition], - attributes_to_return: list[str], - max_depth: int | None = None, - ) -> QuerySpansResponse: - traces = await self.query_traces(attribute_filters=attribute_filters) - spans = [] - - for trace in traces.data: - spans_by_id_resp = await self.get_span_tree( - span_id=trace.root_span_id, - attributes_to_return=attributes_to_return, - max_depth=max_depth, - ) - - for span in spans_by_id_resp.data.values(): - if span.attributes and all( - attr in span.attributes and span.attributes[attr] is not None for attr in attributes_to_return - ): - spans.append( - Span( - trace_id=trace.root_span_id, - span_id=span.span_id, - parent_span_id=span.parent_span_id, - name=span.name, - start_time=span.start_time, - end_time=span.end_time, - attributes=span.attributes, - ) - ) - - return QuerySpansResponse(data=spans) diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py deleted file mode 100644 index 71480364c..000000000 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ /dev/null @@ -1,383 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import json -from datetime import UTC, datetime -from typing import Protocol - -import aiosqlite - -from llama_stack.apis.telemetry import ( - MetricDataPoint, - MetricLabel, - MetricLabelMatcher, - MetricQueryType, - MetricSeries, - QueryCondition, - QueryMetricsResponse, - Span, - SpanWithStatus, - Trace, -) - - -class TraceStore(Protocol): - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> list[Trace]: ... - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> dict[str, SpanWithStatus]: ... - - async def query_metrics( - self, - metric_name: str, - start_time: datetime, - end_time: datetime | None = None, - granularity: str | None = "1d", - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: ... - - -class SQLiteTraceStore(TraceStore): - def __init__(self, conn_string: str): - self.conn_string = conn_string - - async def query_metrics( - self, - metric_name: str, - start_time: datetime, - end_time: datetime | None = None, - granularity: str | None = None, - query_type: MetricQueryType = MetricQueryType.RANGE, - label_matchers: list[MetricLabelMatcher] | None = None, - ) -> QueryMetricsResponse: - if end_time is None: - end_time = datetime.now(UTC) - - # Build base query - if query_type == MetricQueryType.INSTANT: - query = """ - SELECT - se.name, - SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - else: - if granularity: - time_format = self._get_time_format_for_granularity(granularity) - query = f""" - SELECT - se.name, - SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes, - strftime('{time_format}', se.timestamp) as bucket_start - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - else: - query = """ - SELECT - se.name, - json_extract(se.attributes, '$.value') as value, - json_extract(se.attributes, '$.unit') as unit, - se.attributes, - se.timestamp - FROM span_events se - WHERE se.name = ? - AND se.timestamp BETWEEN ? AND ? - """ - - params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()] - - # Labels that will be attached to the MetricSeries (preserve matcher labels) - all_labels: list[MetricLabel] = [] - matcher_label_names = set() - if label_matchers: - for matcher in label_matchers: - json_path = f"$.{matcher.name}" - if matcher.operator == "=": - query += f" AND json_extract(se.attributes, '{json_path}') = ?" - params.append(matcher.value) - elif matcher.operator == "!=": - query += f" AND json_extract(se.attributes, '{json_path}') != ?" - params.append(matcher.value) - elif matcher.operator == "=~": - query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?" - params.append(f"%{matcher.value}%") - elif matcher.operator == "!~": - query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?" - params.append(f"%{matcher.value}%") - # Preserve filter context in output - all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value))) - matcher_label_names.add(matcher.name) - - # GROUP BY / ORDER BY logic - if query_type == MetricQueryType.RANGE and granularity: - group_time_format = self._get_time_format_for_granularity(granularity) - query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')" - query += " ORDER BY bucket_start" - elif query_type == MetricQueryType.INSTANT: - query += " GROUP BY json_extract(se.attributes, '$.unit')" - else: - query += " ORDER BY se.timestamp" - - # Execute query - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, params) as cursor: - rows = await cursor.fetchall() - - if not rows: - return QueryMetricsResponse(data=[]) - - data_points = [] - # We want to add attribute labels, but only those not already present as matcher labels. - attr_label_names = set() - for row in rows: - # Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result. - try: - attributes = json.loads(row["attributes"] or "{}") - except (TypeError, json.JSONDecodeError): - attributes = {} - - value = row["value"] - unit = row["unit"] or "" - - # Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result. - for k, v in attributes.items(): - if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names: - all_labels.append(MetricLabel(name=k, value=str(v))) - attr_label_names.add(k) - - # Determine timestamp - if query_type == MetricQueryType.RANGE and granularity: - try: - bucket_start_raw = row["bucket_start"] - except KeyError as e: - raise ValueError( - "DB did not have a bucket_start time in row when using granularity, this indicates improper formatting" - ) from e - # this value could also be there, but be NULL, I think. - if bucket_start_raw is None: - raise ValueError("bucket_start is None check time format and data") - bucket_start = datetime.fromisoformat(bucket_start_raw) - timestamp = int(bucket_start.timestamp()) - elif query_type == MetricQueryType.INSTANT: - timestamp = int(datetime.now(UTC).timestamp()) - else: - try: - timestamp_raw = row["timestamp"] - except KeyError as e: - raise ValueError( - "DB did not have a timestamp in row, this indicates improper formatting" - ) from e - # this value could also be there, but be NULL, I think. - if timestamp_raw is None: - raise ValueError("timestamp is None check time format and data") - timestamp_iso = datetime.fromisoformat(timestamp_raw) - timestamp = int(timestamp_iso.timestamp()) - - data_points.append( - MetricDataPoint( - timestamp=timestamp, - value=value, - unit=unit, - ) - ) - - metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)] - return QueryMetricsResponse(data=metric_series) - - def _get_time_format_for_granularity(self, granularity: str | None) -> str: - """Get the SQLite strftime format string for a given granularity. - Args: - granularity: Granularity string (e.g., "1m", "5m", "1h", "1d") - Returns: - SQLite strftime format string for the granularity - """ - if granularity is None: - raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation") - - if granularity.endswith("d"): - return "%Y-%m-%d 00:00:00" - elif granularity.endswith("h"): - return "%Y-%m-%d %H:00:00" - elif granularity.endswith("m"): - return "%Y-%m-%d %H:%M:00" - else: - return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps. - - async def query_traces( - self, - attribute_filters: list[QueryCondition] | None = None, - limit: int | None = 100, - offset: int | None = 0, - order_by: list[str] | None = None, - ) -> list[Trace]: - def build_where_clause() -> tuple[str, list]: - if not attribute_filters: - return "", [] - - ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"} - - conditions = [ - f"json_extract(s.attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?" - for condition in attribute_filters - ] - params = [condition.value for condition in attribute_filters] - where_clause = " WHERE " + " AND ".join(conditions) - return where_clause, params - - def build_order_clause() -> str: - if not order_by: - return "" - - order_clauses = [] - for field in order_by: - desc = field.startswith("-") - clean_field = field[1:] if desc else field - order_clauses.append(f"t.{clean_field} {'DESC' if desc else 'ASC'}") - return " ORDER BY " + ", ".join(order_clauses) - - # Build the main query - base_query = """ - WITH matching_traces AS ( - SELECT DISTINCT t.trace_id - FROM traces t - JOIN spans s ON t.trace_id = s.trace_id - {where_clause} - ), - filtered_traces AS ( - SELECT t.trace_id, t.root_span_id, t.start_time, t.end_time - FROM matching_traces mt - JOIN traces t ON mt.trace_id = t.trace_id - LEFT JOIN spans s ON t.trace_id = s.trace_id - {order_clause} - ) - SELECT DISTINCT trace_id, root_span_id, start_time, end_time - FROM filtered_traces - WHERE root_span_id IS NOT NULL - LIMIT {limit} OFFSET {offset} - """ - - where_clause, params = build_where_clause() - query = base_query.format( - where_clause=where_clause, - order_clause=build_order_clause(), - limit=limit, - offset=offset, - ) - - # Execute query and return results - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, params) as cursor: - rows = await cursor.fetchall() - return [ - Trace( - trace_id=row["trace_id"], - root_span_id=row["root_span_id"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - ) - for row in rows - ] - - async def get_span_tree( - self, - span_id: str, - attributes_to_return: list[str] | None = None, - max_depth: int | None = None, - ) -> dict[str, SpanWithStatus]: - # Build the attributes selection - attributes_select = "s.attributes" - if attributes_to_return: - json_object = ", ".join(f"'{key}', json_extract(s.attributes, '$.{key}')" for key in attributes_to_return) - attributes_select = f"json_object({json_object})" - - # SQLite CTE query with filtered attributes - query = f""" - WITH RECURSIVE span_tree AS ( - SELECT s.*, 1 as depth, {attributes_select} as filtered_attributes - FROM spans s - WHERE s.span_id = ? - - UNION ALL - - SELECT s.*, st.depth + 1, {attributes_select} as filtered_attributes - FROM spans s - JOIN span_tree st ON s.parent_span_id = st.span_id - WHERE (? IS NULL OR st.depth < ?) - ) - SELECT * - FROM span_tree - ORDER BY depth, start_time - """ - - spans_by_id = {} - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (span_id, max_depth, max_depth)) as cursor: - rows = await cursor.fetchall() - - if not rows: - raise ValueError(f"Span {span_id} not found") - - for row in rows: - span = SpanWithStatus( - span_id=row["span_id"], - trace_id=row["trace_id"], - parent_span_id=row["parent_span_id"], - name=row["name"], - start_time=datetime.fromisoformat(row["start_time"]), - end_time=datetime.fromisoformat(row["end_time"]), - attributes=json.loads(row["filtered_attributes"]), - status=row["status"].lower(), - ) - - spans_by_id[span.span_id] = span - - return spans_by_id - - async def get_trace(self, trace_id: str) -> Trace: - query = """ - SELECT * - FROM traces t - WHERE t.trace_id = ? - """ - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (trace_id,)) as cursor: - row = await cursor.fetchone() - if row is None: - raise ValueError(f"Trace {trace_id} not found") - return Trace(**row) - - async def get_span(self, trace_id: str, span_id: str) -> Span: - query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?" - async with aiosqlite.connect(self.conn_string) as conn: - conn.row_factory = aiosqlite.Row - async with conn.execute(query, (trace_id, span_id)) as cursor: - row = await cursor.fetchone() - if row is None: - raise ValueError(f"Span {span_id} not found") - return Span(**row) diff --git a/llama_stack/providers/utils/telemetry/tracing.py b/llama_stack/providers/utils/telemetry/tracing.py deleted file mode 100644 index 738d2bf03..000000000 --- a/llama_stack/providers/utils/telemetry/tracing.py +++ /dev/null @@ -1,383 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import asyncio -import contextvars -import logging # allow-direct-logging -import queue -import secrets -import sys -import threading -import time -from collections.abc import Callable -from datetime import UTC, datetime -from functools import wraps -from typing import Any - -from llama_stack.apis.telemetry import ( - Event, - LogSeverity, - Span, - SpanEndPayload, - SpanStartPayload, - SpanStatus, - StructuredLogEvent, - Telemetry, - UnstructuredLogEvent, -) -from llama_stack.log import get_logger - -logger = get_logger(__name__, category="core") - -# Fallback logger that does NOT propagate to TelemetryHandler to avoid recursion -_fallback_logger = logging.getLogger("llama_stack.telemetry.background") -if not _fallback_logger.handlers: - _fallback_logger.propagate = False - _fallback_logger.setLevel(logging.ERROR) - _fallback_handler = logging.StreamHandler(sys.stderr) - _fallback_handler.setLevel(logging.ERROR) - _fallback_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) - _fallback_logger.addHandler(_fallback_handler) - - -INVALID_SPAN_ID = 0x0000000000000000 -INVALID_TRACE_ID = 0x00000000000000000000000000000000 - -ROOT_SPAN_MARKERS = ["__root__", "__root_span__"] -# The logical root span may not be visible to this process if a parent context -# is passed in. The local root span is the first local span in a trace. -LOCAL_ROOT_SPAN_MARKER = "__local_root_span__" - - -def trace_id_to_str(trace_id: int) -> str: - """Convenience trace ID formatting method - Args: - trace_id: Trace ID int - - Returns: - The trace ID as 32-byte hexadecimal string - """ - return format(trace_id, "032x") - - -def span_id_to_str(span_id: int) -> str: - """Convenience span ID formatting method - Args: - span_id: Span ID int - - Returns: - The span ID as 16-byte hexadecimal string - """ - return format(span_id, "016x") - - -def generate_span_id() -> str: - span_id = secrets.randbits(64) - while span_id == INVALID_SPAN_ID: - span_id = secrets.randbits(64) - return span_id_to_str(span_id) - - -def generate_trace_id() -> str: - trace_id = secrets.randbits(128) - while trace_id == INVALID_TRACE_ID: - trace_id = secrets.randbits(128) - return trace_id_to_str(trace_id) - - -CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None) -BACKGROUND_LOGGER = None - -LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS = 60.0 - - -class BackgroundLogger: - def __init__(self, api: Telemetry, capacity: int = 100000): - self.api = api - self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity) - self.worker_thread = threading.Thread(target=self._worker, daemon=True) - self.worker_thread.start() - self._last_queue_full_log_time: float = 0.0 - self._dropped_since_last_notice: int = 0 - - def log_event(self, event): - try: - self.log_queue.put_nowait(event) - except queue.Full: - # Aggregate drops and emit at most once per interval via fallback logger - self._dropped_since_last_notice += 1 - current_time = time.time() - if current_time - self._last_queue_full_log_time >= LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS: - _fallback_logger.error( - "Log queue is full; dropped %d events since last notice", - self._dropped_since_last_notice, - ) - self._last_queue_full_log_time = current_time - self._dropped_since_last_notice = 0 - - def _worker(self): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(self._process_logs()) - - async def _process_logs(self): - while True: - try: - event = self.log_queue.get() - await self.api.log_event(event) - except Exception: - import traceback - - traceback.print_exc() - print("Error processing log event") - finally: - self.log_queue.task_done() - - def __del__(self): - self.log_queue.join() - - -def enqueue_event(event: Event) -> None: - """Enqueue a telemetry event to the background logger if available. - - This provides a non-blocking path for routers and other hot paths to - submit telemetry without awaiting the Telemetry API, reducing contention - with the main event loop. - """ - global BACKGROUND_LOGGER - if BACKGROUND_LOGGER is None: - raise RuntimeError("Telemetry API not initialized") - BACKGROUND_LOGGER.log_event(event) - - -class TraceContext: - spans: list[Span] = [] - - def __init__(self, logger: BackgroundLogger, trace_id: str): - self.logger = logger - self.trace_id = trace_id - - def push_span(self, name: str, attributes: dict[str, Any] = None) -> Span: - current_span = self.get_current_span() - span = Span( - span_id=generate_span_id(), - trace_id=self.trace_id, - name=name, - start_time=datetime.now(UTC), - parent_span_id=current_span.span_id if current_span else None, - attributes=attributes, - ) - - self.logger.log_event( - StructuredLogEvent( - trace_id=span.trace_id, - span_id=span.span_id, - timestamp=span.start_time, - attributes=span.attributes, - payload=SpanStartPayload( - name=span.name, - parent_span_id=span.parent_span_id, - ), - ) - ) - - self.spans.append(span) - return span - - def pop_span(self, status: SpanStatus = SpanStatus.OK): - span = self.spans.pop() - if span is not None: - self.logger.log_event( - StructuredLogEvent( - trace_id=span.trace_id, - span_id=span.span_id, - timestamp=span.start_time, - attributes=span.attributes, - payload=SpanEndPayload( - status=status, - ), - ) - ) - - def get_current_span(self): - return self.spans[-1] if self.spans else None - - -def setup_logger(api: Telemetry, level: int = logging.INFO): - global BACKGROUND_LOGGER - - if BACKGROUND_LOGGER is None: - BACKGROUND_LOGGER = BackgroundLogger(api) - root_logger = logging.getLogger() - root_logger.setLevel(level) - root_logger.addHandler(TelemetryHandler()) - - -async def start_trace(name: str, attributes: dict[str, Any] = None) -> TraceContext: - global CURRENT_TRACE_CONTEXT, BACKGROUND_LOGGER - - if BACKGROUND_LOGGER is None: - logger.debug("No Telemetry implementation set. Skipping trace initialization...") - return - - trace_id = generate_trace_id() - context = TraceContext(BACKGROUND_LOGGER, trace_id) - # Mark this span as the root for the trace for now. The processing of - # traceparent context if supplied comes later and will result in the - # ROOT_SPAN_MARKERS being removed. Also mark this is the 'local' root, - # i.e. the root of the spans originating in this process as this is - # needed to ensure that we insert this 'local' root span's id into - # the trace record in sqlite store. - attributes = dict.fromkeys(ROOT_SPAN_MARKERS, True) | {LOCAL_ROOT_SPAN_MARKER: True} | (attributes or {}) - context.push_span(name, attributes) - - CURRENT_TRACE_CONTEXT.set(context) - return context - - -async def end_trace(status: SpanStatus = SpanStatus.OK): - global CURRENT_TRACE_CONTEXT - - context = CURRENT_TRACE_CONTEXT.get() - if context is None: - logger.debug("No trace context to end") - return - - context.pop_span(status) - CURRENT_TRACE_CONTEXT.set(None) - - -def severity(levelname: str) -> LogSeverity: - if levelname == "DEBUG": - return LogSeverity.DEBUG - elif levelname == "INFO": - return LogSeverity.INFO - elif levelname == "WARNING": - return LogSeverity.WARN - elif levelname == "ERROR": - return LogSeverity.ERROR - elif levelname == "CRITICAL": - return LogSeverity.CRITICAL - else: - raise ValueError(f"Unknown log level: {levelname}") - - -# TODO: ideally, the actual emitting should be done inside a separate daemon -# process completely isolated from the server -class TelemetryHandler(logging.Handler): - def emit(self, record: logging.LogRecord): - # horrendous hack to avoid logging from asyncio and getting into an infinite loop - if record.module in ("asyncio", "selector_events"): - return - - global CURRENT_TRACE_CONTEXT - context = CURRENT_TRACE_CONTEXT.get() - if context is None: - return - - span = context.get_current_span() - if span is None: - return - - enqueue_event( - UnstructuredLogEvent( - trace_id=span.trace_id, - span_id=span.span_id, - timestamp=datetime.now(UTC), - message=self.format(record), - severity=severity(record.levelname), - ) - ) - - def close(self): - pass - - -class SpanContextManager: - def __init__(self, name: str, attributes: dict[str, Any] = None): - self.name = name - self.attributes = attributes - self.span = None - - def __enter__(self): - global CURRENT_TRACE_CONTEXT - context = CURRENT_TRACE_CONTEXT.get() - if not context: - logger.debug("No trace context to push span") - return self - - self.span = context.push_span(self.name, self.attributes) - return self - - def __exit__(self, exc_type, exc_value, traceback): - global CURRENT_TRACE_CONTEXT - context = CURRENT_TRACE_CONTEXT.get() - if not context: - logger.debug("No trace context to pop span") - return - - context.pop_span() - - def set_attribute(self, key: str, value: Any): - if self.span: - if self.span.attributes is None: - self.span.attributes = {} - self.span.attributes[key] = serialize_value(value) - - async def __aenter__(self): - global CURRENT_TRACE_CONTEXT - context = CURRENT_TRACE_CONTEXT.get() - if not context: - logger.debug("No trace context to push span") - return self - - self.span = context.push_span(self.name, self.attributes) - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - global CURRENT_TRACE_CONTEXT - context = CURRENT_TRACE_CONTEXT.get() - if not context: - logger.debug("No trace context to pop span") - return - - context.pop_span() - - def __call__(self, func: Callable): - @wraps(func) - def sync_wrapper(*args, **kwargs): - with self: - return func(*args, **kwargs) - - @wraps(func) - async def async_wrapper(*args, **kwargs): - async with self: - return await func(*args, **kwargs) - - @wraps(func) - def wrapper(*args, **kwargs): - if asyncio.iscoroutinefunction(func): - return async_wrapper(*args, **kwargs) - else: - return sync_wrapper(*args, **kwargs) - - return wrapper - - -def span(name: str, attributes: dict[str, Any] = None): - return SpanContextManager(name, attributes) - - -def get_current_span() -> Span | None: - global CURRENT_TRACE_CONTEXT - if CURRENT_TRACE_CONTEXT is None: - logger.debug("No trace context to get current span") - return None - - context = CURRENT_TRACE_CONTEXT.get() - if context: - return context.get_current_span() - return None diff --git a/llama_stack/providers/utils/telemetry/__init__.py b/llama_stack/telemetry/__init__.py similarity index 100% rename from llama_stack/providers/utils/telemetry/__init__.py rename to llama_stack/telemetry/__init__.py diff --git a/llama_stack/telemetry/middleware.py b/llama_stack/telemetry/middleware.py new file mode 100644 index 000000000..e2b9b2cd3 --- /dev/null +++ b/llama_stack/telemetry/middleware.py @@ -0,0 +1,120 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import time + +from opentelemetry import trace +from opentelemetry.metrics import Counter, Histogram +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +from llama_stack.log import get_logger + +logger = get_logger(name=__name__, category="instrumentation::otel") + + +class StreamingMetricsMiddleware: + """ + ASGI middleware to track streaming response metrics. + + :param app: The ASGI app to wrap + """ + + def __init__(self, app: ASGIApp): + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + logger.debug(f"StreamingMetricsMiddleware called for {scope.get('method')} {scope.get('path')}") + start_time = time.time() + is_streaming = False + + async def send_wrapper(message: Message): + nonlocal is_streaming + + # Detect streaming responses by headers + if message["type"] == "http.response.start": + headers = message.get("headers", []) + for name, value in headers: + if name == b"content-type" and b"text/event-stream" in value: + is_streaming = True + # Add streaming attribute to current span + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + current_span.set_attribute("http.response.is_streaming", True) + break + + # Record total duration when response body completes + elif message["type"] == "http.response.body" and not message.get("more_body", False): + if is_streaming: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + total_duration_ms = (time.time() - start_time) * 1000 + current_span.set_attribute("http.streaming.total_duration_ms", total_duration_ms) + + await send(message) + + await self.app(scope, receive, send_wrapper) + + +class MetricsSpanExporter(SpanExporter): + """ + Records additional custom HTTP metrics during otel span export. + + :param request_duration: Histogram to record request duration + :param streaming_duration: Histogram to record streaming duration + :param streaming_requests: Counter to record streaming requests + :param request_count: Counter to record request count + """ + + def __init__( + self, + request_duration: Histogram, + streaming_duration: Histogram, + streaming_requests: Counter, + request_count: Counter, + ): + self.request_duration = request_duration + self.streaming_duration = streaming_duration + self.streaming_requests = streaming_requests + self.request_count = request_count + + def export(self, spans): + for span in spans: + if not span.attributes or not span.attributes.get("http.method"): + continue + logger.debug(f"Processing span: {span.name}") + + if span.end_time is None or span.start_time is None: + continue + + duration_ms = (span.end_time - span.start_time) / 1_000_000 + is_streaming = span.attributes.get("http.response.is_streaming", False) + + attributes = { + "http.method": str(span.attributes.get("http.method", "UNKNOWN")), + "http.route": str(span.attributes.get("http.route", span.attributes.get("http.target", "/"))), + "http.status_code": str(span.attributes.get("http.status_code", 0)), + "trace_id": str(span.attributes.get("trace_id", "")), + "span_id": str(span.attributes.get("span_id", "")), + } + + # Record request count and duration + logger.debug(f"Recording metrics: duration={duration_ms}ms, attributes={attributes}") + self.request_count.add(1, attributes) + self.request_duration.record(duration_ms, attributes) + + if is_streaming: + logger.debug(f"MetricsSpanExporter: Recording streaming metrics for {span.name}") + self.streaming_requests.add(1, attributes) + stream_duration = span.attributes.get("http.streaming.total_duration_ms") + if stream_duration and isinstance(stream_duration, (int | float)): + self.streaming_duration.record(float(stream_duration), attributes) + + return SpanExportResult.SUCCESS diff --git a/llama_stack/telemetry/otel.py b/llama_stack/telemetry/otel.py new file mode 100644 index 000000000..0df42b543 --- /dev/null +++ b/llama_stack/telemetry/otel.py @@ -0,0 +1,99 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. +import os + +from llama_stack.log import get_logger +from fastapi import FastAPI +from opentelemetry import metrics, trace +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, +) +from llama_stack.telemetry.middleware import StreamingMetricsMiddleware, MetricsSpanExporter + + +logger = get_logger(name=__name__, category="telemetry::otel") + + +class OTelInstrumentation: + """OpenTelemetry instrumentation.""" + + def fastapi_middleware(self, app: FastAPI): + """Inject OpenTelemetry middleware into FastAPI.""" + meter = metrics.get_meter("llama_stack.http.server") + + # HTTP Metrics following OTel semantic conventions + # https://opentelemetry.io/docs/specs/semconv/http/http-metrics/ + request_duration = meter.create_histogram( + "http.server.request.duration", + unit="ms", + description="Duration of HTTP requests (time-to-first-byte for streaming)", + ) + + streaming_duration = meter.create_histogram( + "http.server.streaming.duration", + unit="ms", + description="Total duration of streaming responses (from start to stream completion)", + ) + + request_count = meter.create_counter( + "http.server.request.count", unit="requests", description="Total number of HTTP requests" + ) + + streaming_requests = meter.create_counter( + "http.server.streaming.count", unit="requests", description="Number of streaming requests" + ) + + # Hook to enrich spans and record initial metrics + def server_request_hook(span, scope): + """ + Called by FastAPIInstrumentor for each request. + + This only reads from scope (ASGI dict), never touches request body. + Safe to use without interfering with body parsing. + """ + method = scope.get("method", "UNKNOWN") + path = scope.get("path", "/") + + # Add custom attributes + span.set_attribute("service.component", "llama-stack-api") + span.set_attribute("http.request", path) + span.set_attribute("http.method", method) + + attributes = { + "http.request": path, + "http.method": method, + "trace_id": span.attributes.get("trace_id", ""), + "span_id": span.attributes.get("span_id", ""), + } + + request_count.add(1, attributes) + logger.debug(f"server_request_hook: recorded request_count for {method} {path}, attributes={attributes}") + + # NOTE: This is called BEFORE routes are added to the app + # FastAPIInstrumentor.instrument_app() patches build_middleware_stack(), + # which will be called on first request (after routes are added) + logger.debug("Instrumenting FastAPI (routes will be added later)") + FastAPIInstrumentor.instrument_app( + app, + server_request_hook=server_request_hook, + ) + logger.debug(f"FastAPI instrumented: {getattr(app, '_is_instrumented_by_opentelemetry', False)}") + + # Add pure ASGI middleware for streaming metrics (always add, regardless of instrumentation) + app.add_middleware(StreamingMetricsMiddleware) + + # Add metrics span processor + provider = trace.get_tracer_provider() + if isinstance(provider, TracerProvider): + metrics_exporter = MetricsSpanExporter( + request_duration=request_duration, + streaming_duration=streaming_duration, + streaming_requests=streaming_requests, + request_count=request_count, + ) + provider.add_span_processor(BatchSpanProcessor(metrics_exporter)) diff --git a/pyproject.toml b/pyproject.toml index 5f086bd9d..c66a29eb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,14 +25,14 @@ classifiers = [ ] dependencies = [ "aiohttp", - "fastapi>=0.115.0,<1.0", # server - "fire", # for MCP in LLS client + "fastapi>=0.115.0,<1.0", # server + "fire", # for MCP in LLS client "httpx", "huggingface-hub>=0.34.0,<1.0", "jinja2>=3.1.6", "jsonschema", "llama-stack-client>=0.2.23", - "openai>=1.107", # for expires_after support + "openai>=1.107", # for expires_after support "prompt-toolkit", "python-dotenv", "python-jose[cryptography]", @@ -43,13 +43,14 @@ dependencies = [ "tiktoken", "pillow", "h11>=0.16.0", - "python-multipart>=0.0.20", # For fastapi Form - "uvicorn>=0.34.0", # server - "opentelemetry-sdk>=1.30.0", # server + "python-multipart>=0.0.20", # For fastapi Form + "uvicorn>=0.34.0", # server + "opentelemetry-sdk>=1.30.0", # server "opentelemetry-exporter-otlp-proto-http>=1.30.0", # server - "aiosqlite>=0.21.0", # server - for metadata store - "asyncpg", # for metadata store - "sqlalchemy[asyncio]>=2.0.41", # server - for conversations + "aiosqlite>=0.21.0", # server - for metadata store + "asyncpg", # for metadata store + "sqlalchemy[asyncio]>=2.0.41", # server - for conversations + "opentelemetry-instrumentation-fastapi>=0.57b0", ] [project.optional-dependencies] diff --git a/tests/integration/telemetry/recordings/0de60cd6a6ec3dbfc4a7601e77be8083caf34f49147ad1c25efae1de3f0b25e5.json b/tests/integration/telemetry/recordings/0de60cd6a6ec3dbfc4a7601e77be8083caf34f49147ad1c25efae1de3f0b25e5.json deleted file mode 100644 index 918eac432..000000000 --- a/tests/integration/telemetry/recordings/0de60cd6a6ec3dbfc4a7601e77be8083caf34f49147ad1c25efae1de3f0b25e5.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "request": { - "method": "POST", - "url": "http://localhost:11434/v1/v1/completions", - "headers": {}, - "body": { - "model": "llama3.2:3b-instruct-fp16", - "messages": [ - { - "role": "user", - "content": "Test OpenAI telemetry creation" - } - ], - "stream": false - }, - "endpoint": "/v1/completions", - "model": "llama3.2:3b-instruct-fp16" - }, - "response": { - "body": { - "__type__": "openai.types.chat.chat_completion.ChatCompletion", - "__data__": { - "id": "rec-67198cbad48f", - "choices": [ - { - "finish_reason": "stop", - "index": 0, - "logprobs": null, - "message": { - "content": "import openai\n\n# You can replace this with your own API key\nAPI_KEY = \"your_openai_api_key\"\n\n# Create an OpenAI instance\nopenai_client = openai.Client(api_key=API_KEY)\n\n# Test the telemetry endpoint by creating a new telemetry instance\ntelemetry = openai_client.create_telemetry()\n\nprint(telemetry)", - "refusal": null, - "role": "assistant", - "annotations": null, - "audio": null, - "function_call": null, - "tool_calls": null - } - } - ], - "created": 0, - "model": "llama3.2:3b-instruct-fp16", - "object": "chat.completion", - "service_tier": null, - "system_fingerprint": "fp_ollama", - "usage": { - "completion_tokens": 72, - "prompt_tokens": 30, - "total_tokens": 102, - "completion_tokens_details": null, - "prompt_tokens_details": null - } - } - }, - "is_streaming": false - } -} diff --git a/tests/integration/telemetry/test_openai_telemetry.py b/tests/integration/telemetry/test_openai_telemetry.py deleted file mode 100644 index b3ffb6b09..000000000 --- a/tests/integration/telemetry/test_openai_telemetry.py +++ /dev/null @@ -1,194 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import time -from datetime import UTC, datetime - -import pytest - - -@pytest.fixture(scope="module", autouse=True) -def setup_openai_telemetry_data(llama_stack_client, text_model_id): - """Setup fixture that creates telemetry data specifically for OpenAI completions testing.""" - - # Create OpenAI completion traces - for i in range(3): - llama_stack_client.chat.completions.create( - model=text_model_id, - messages=[ - { - "role": "user", - "content": f"Test trace openai {i}", - } - ], - # stream=False to always capture Metrics. - stream=False, - ) - - # Create additional OpenAI completion traces with different parameters - for i in range(2): - llama_stack_client.chat.completions.create( - model=text_model_id, - messages=[ - { - "role": "user", - "content": f"Test trace openai with temperature {i}", - } - ], - temperature=0.7, - max_tokens=100, - stream=False, - ) - - start_time = time.time() - - while time.time() - start_time < 30: - traces = llama_stack_client.telemetry.query_traces(limit=10) - if len(traces) >= 5: # 5 OpenAI completion traces - break - time.sleep(0.1) - - if len(traces) < 5: - pytest.fail( - f"Failed to create sufficient OpenAI completion telemetry data after 30s. Got {len(traces)} traces." - ) - - yield - - -def test_openai_traces_basic(llama_stack_client): - """Test basic trace querying functionality for OpenAI completions.""" - all_traces = llama_stack_client.telemetry.query_traces(limit=10) - - assert isinstance(all_traces, list), "Should return a list of traces" - assert len(all_traces) >= 5, "Should have at least 5 traces from OpenAI setup" - - # Verify trace structure and data quality - first_trace = all_traces[0] - assert hasattr(first_trace, "trace_id"), "Trace should have trace_id" - assert hasattr(first_trace, "start_time"), "Trace should have start_time" - assert hasattr(first_trace, "root_span_id"), "Trace should have root_span_id" - - # Validate trace_id is a valid UUID format - assert isinstance(first_trace.trace_id, str) and len(first_trace.trace_id) > 0, ( - "trace_id should be non-empty string" - ) - - # Validate start_time format and not in the future - now = datetime.now(UTC) - if isinstance(first_trace.start_time, str): - trace_time = datetime.fromisoformat(first_trace.start_time.replace("Z", "+00:00")) - else: - # start_time is already a datetime object - trace_time = first_trace.start_time - if trace_time.tzinfo is None: - trace_time = trace_time.replace(tzinfo=UTC) - - # Ensure trace time is not in the future - time_diff = (now - trace_time).total_seconds() - assert time_diff >= 0, f"Trace start_time should not be in the future, got {time_diff}s" - - # Validate root_span_id exists and is non-empty - assert isinstance(first_trace.root_span_id, str) and len(first_trace.root_span_id) > 0, ( - "root_span_id should be non-empty string" - ) - - # Test querying specific trace by ID - specific_trace = llama_stack_client.telemetry.get_trace(trace_id=first_trace.trace_id) - assert specific_trace.trace_id == first_trace.trace_id, "Retrieved trace should match requested ID" - assert specific_trace.start_time == first_trace.start_time, "Retrieved trace should have same start_time" - assert specific_trace.root_span_id == first_trace.root_span_id, "Retrieved trace should have same root_span_id" - - # Test pagination with proper validation - recent_traces = llama_stack_client.telemetry.query_traces(limit=3, offset=0) - assert len(recent_traces) <= 3, "Should return at most 3 traces when limit=3" - assert len(recent_traces) >= 1, "Should return at least 1 trace" - - # Verify all traces have required fields - for trace in recent_traces: - assert hasattr(trace, "trace_id") and trace.trace_id, "Each trace should have non-empty trace_id" - assert hasattr(trace, "start_time") and trace.start_time, "Each trace should have non-empty start_time" - assert hasattr(trace, "root_span_id") and trace.root_span_id, "Each trace should have non-empty root_span_id" - - -def test_openai_spans_basic(llama_stack_client): - """Test basic span querying functionality for OpenAI completions.""" - spans = llama_stack_client.telemetry.query_spans(attribute_filters=[], attributes_to_return=[]) - - assert isinstance(spans, list), "Should return a list of spans" - assert len(spans) >= 1, "Should have at least one span from OpenAI setup" - - # Verify span structure and data quality - first_span = spans[0] - required_attrs = ["span_id", "name", "trace_id"] - for attr in required_attrs: - assert hasattr(first_span, attr), f"Span should have {attr} attribute" - assert getattr(first_span, attr), f"Span {attr} should not be empty" - - # Validate span data types and content - assert isinstance(first_span.span_id, str) and len(first_span.span_id) > 0, "span_id should be non-empty string" - assert isinstance(first_span.name, str) and len(first_span.name) > 0, "span name should be non-empty string" - assert isinstance(first_span.trace_id, str) and len(first_span.trace_id) > 0, "trace_id should be non-empty string" - - # Verify span belongs to a valid trace - all_traces = llama_stack_client.telemetry.query_traces(limit=10) - trace_ids = {t.trace_id for t in all_traces} - if first_span.trace_id in trace_ids: - trace = llama_stack_client.telemetry.get_trace(trace_id=first_span.trace_id) - assert trace is not None, "Should be able to retrieve trace for valid trace_id" - assert trace.trace_id == first_span.trace_id, "Trace ID should match span's trace_id" - - # Test with span filtering and validate results - filtered_spans = llama_stack_client.telemetry.query_spans( - attribute_filters=[{"key": "name", "op": "eq", "value": first_span.name}], - attributes_to_return=["name", "span_id"], - ) - assert isinstance(filtered_spans, list), "Should return a list with span name filter" - - # Validate filtered spans if filtering works - if len(filtered_spans) > 0: - for span in filtered_spans: - assert hasattr(span, "name"), "Filtered spans should have name attribute" - assert hasattr(span, "span_id"), "Filtered spans should have span_id attribute" - assert span.name == first_span.name, "Filtered spans should match the filter criteria" - assert isinstance(span.span_id, str) and len(span.span_id) > 0, "Filtered span_id should be valid" - - # Test that all spans have consistent structure - for span in spans: - for attr in required_attrs: - assert hasattr(span, attr) and getattr(span, attr), f"All spans should have non-empty {attr}" - - -def test_openai_completion_creates_telemetry(llama_stack_client, text_model_id): - """Test that making OpenAI completion calls actually creates telemetry data.""" - - # Get initial trace count - initial_traces = llama_stack_client.telemetry.query_traces(limit=20) - initial_count = len(initial_traces) - - # Make a new OpenAI completion call - response = llama_stack_client.chat.completions.create( - model=text_model_id, - messages=[{"role": "user", "content": "Test OpenAI telemetry creation"}], - stream=False, - ) - - # Verify we got a response - assert response is not None, "Should get a response from OpenAI completion" - assert hasattr(response, "choices"), "Response should have choices" - assert len(response.choices) > 0, "Response should have at least one choice" - - # Wait for telemetry to be recorded - start_time = time.time() - while time.time() - start_time < 30: - final_traces = llama_stack_client.telemetry.query_traces(limit=20) - final_count = len(final_traces) - if final_count > initial_count: - break - time.sleep(0.1) - - # Should have at least as many traces as before (might have more due to other activity) - assert final_count >= initial_count, "Should have at least as many traces after OpenAI call" diff --git a/tests/integration/telemetry/test_telemetry.py b/tests/integration/telemetry/test_telemetry.py deleted file mode 100644 index aff337e35..000000000 --- a/tests/integration/telemetry/test_telemetry.py +++ /dev/null @@ -1,187 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import time -from datetime import UTC, datetime -from uuid import uuid4 - -import pytest -from llama_stack_client import Agent - - -@pytest.fixture(scope="module", autouse=True) -def setup_telemetry_data(llama_stack_client, text_model_id): - """Setup fixture that creates telemetry data before tests run.""" - agent = Agent(llama_stack_client, model=text_model_id, instructions="You are a helpful assistant") - - session_id = agent.create_session(f"test-setup-session-{uuid4()}") - - messages = [ - "What is 2 + 2?", - "Tell me a short joke", - ] - - for msg in messages: - agent.create_turn( - messages=[{"role": "user", "content": msg}], - session_id=session_id, - stream=False, - ) - - for i in range(2): - llama_stack_client.chat.completions.create( - model=text_model_id, messages=[{"role": "user", "content": f"Test trace {i}"}] - ) - - start_time = time.time() - - while time.time() - start_time < 30: - traces = llama_stack_client.telemetry.query_traces(limit=10) - if len(traces) >= 4: - break - time.sleep(0.1) - - if len(traces) < 4: - pytest.fail(f"Failed to create sufficient telemetry data after 30s. Got {len(traces)} traces.") - - yield - - -def test_query_traces_basic(llama_stack_client): - """Test basic trace querying functionality with proper data validation.""" - all_traces = llama_stack_client.telemetry.query_traces(limit=5) - - assert isinstance(all_traces, list), "Should return a list of traces" - assert len(all_traces) >= 4, "Should have at least 4 traces from setup" - - # Verify trace structure and data quality - first_trace = all_traces[0] - assert hasattr(first_trace, "trace_id"), "Trace should have trace_id" - assert hasattr(first_trace, "start_time"), "Trace should have start_time" - assert hasattr(first_trace, "root_span_id"), "Trace should have root_span_id" - - # Validate trace_id is a valid UUID format - assert isinstance(first_trace.trace_id, str) and len(first_trace.trace_id) > 0, ( - "trace_id should be non-empty string" - ) - - # Validate start_time format and not in the future - now = datetime.now(UTC) - if isinstance(first_trace.start_time, str): - trace_time = datetime.fromisoformat(first_trace.start_time.replace("Z", "+00:00")) - else: - # start_time is already a datetime object - trace_time = first_trace.start_time - if trace_time.tzinfo is None: - trace_time = trace_time.replace(tzinfo=UTC) - - # Ensure trace time is not in the future (but allow any age in the past for persistent test data) - time_diff = (now - trace_time).total_seconds() - assert time_diff >= 0, f"Trace start_time should not be in the future, got {time_diff}s" - - # Validate root_span_id exists and is non-empty - assert isinstance(first_trace.root_span_id, str) and len(first_trace.root_span_id) > 0, ( - "root_span_id should be non-empty string" - ) - - # Test querying specific trace by ID - specific_trace = llama_stack_client.telemetry.get_trace(trace_id=first_trace.trace_id) - assert specific_trace.trace_id == first_trace.trace_id, "Retrieved trace should match requested ID" - assert specific_trace.start_time == first_trace.start_time, "Retrieved trace should have same start_time" - assert specific_trace.root_span_id == first_trace.root_span_id, "Retrieved trace should have same root_span_id" - - # Test pagination with proper validation - recent_traces = llama_stack_client.telemetry.query_traces(limit=3, offset=0) - assert len(recent_traces) <= 3, "Should return at most 3 traces when limit=3" - assert len(recent_traces) >= 1, "Should return at least 1 trace" - - # Verify all traces have required fields - for trace in recent_traces: - assert hasattr(trace, "trace_id") and trace.trace_id, "Each trace should have non-empty trace_id" - assert hasattr(trace, "start_time") and trace.start_time, "Each trace should have non-empty start_time" - assert hasattr(trace, "root_span_id") and trace.root_span_id, "Each trace should have non-empty root_span_id" - - -def test_query_spans_basic(llama_stack_client): - """Test basic span querying functionality with proper validation.""" - spans = llama_stack_client.telemetry.query_spans(attribute_filters=[], attributes_to_return=[]) - - assert isinstance(spans, list), "Should return a list of spans" - assert len(spans) >= 1, "Should have at least one span from setup" - - # Verify span structure and data quality - first_span = spans[0] - required_attrs = ["span_id", "name", "trace_id"] - for attr in required_attrs: - assert hasattr(first_span, attr), f"Span should have {attr} attribute" - assert getattr(first_span, attr), f"Span {attr} should not be empty" - - # Validate span data types and content - assert isinstance(first_span.span_id, str) and len(first_span.span_id) > 0, "span_id should be non-empty string" - assert isinstance(first_span.name, str) and len(first_span.name) > 0, "span name should be non-empty string" - assert isinstance(first_span.trace_id, str) and len(first_span.trace_id) > 0, "trace_id should be non-empty string" - - # Verify span belongs to a valid trace (test with traces we know exist) - all_traces = llama_stack_client.telemetry.query_traces(limit=10) - trace_ids = {t.trace_id for t in all_traces} - if first_span.trace_id in trace_ids: - trace = llama_stack_client.telemetry.get_trace(trace_id=first_span.trace_id) - assert trace is not None, "Should be able to retrieve trace for valid trace_id" - assert trace.trace_id == first_span.trace_id, "Trace ID should match span's trace_id" - - # Test with span filtering and validate results - filtered_spans = llama_stack_client.telemetry.query_spans( - attribute_filters=[{"key": "name", "op": "eq", "value": first_span.name}], - attributes_to_return=["name", "span_id"], - ) - assert isinstance(filtered_spans, list), "Should return a list with span name filter" - - # Validate filtered spans if filtering works - if len(filtered_spans) > 0: - for span in filtered_spans: - assert hasattr(span, "name"), "Filtered spans should have name attribute" - assert hasattr(span, "span_id"), "Filtered spans should have span_id attribute" - assert span.name == first_span.name, "Filtered spans should match the filter criteria" - assert isinstance(span.span_id, str) and len(span.span_id) > 0, "Filtered span_id should be valid" - - # Test that all spans have consistent structure - for span in spans: - for attr in required_attrs: - assert hasattr(span, attr) and getattr(span, attr), f"All spans should have non-empty {attr}" - - -def test_telemetry_pagination(llama_stack_client): - """Test pagination in telemetry queries.""" - # Get total count of traces - all_traces = llama_stack_client.telemetry.query_traces(limit=20) - total_count = len(all_traces) - assert total_count >= 4, "Should have at least 4 traces from setup" - - # Test trace pagination - page1 = llama_stack_client.telemetry.query_traces(limit=2, offset=0) - page2 = llama_stack_client.telemetry.query_traces(limit=2, offset=2) - - assert len(page1) == 2, "First page should have exactly 2 traces" - assert len(page2) >= 1, "Second page should have at least 1 trace" - - # Verify no overlap between pages - page1_ids = {t.trace_id for t in page1} - page2_ids = {t.trace_id for t in page2} - assert len(page1_ids.intersection(page2_ids)) == 0, "Pages should contain different traces" - - # Test ordering - ordered_traces = llama_stack_client.telemetry.query_traces(limit=5, order_by=["start_time"]) - assert len(ordered_traces) >= 4, "Should have at least 4 traces for ordering test" - - # Verify ordering by start_time - for i in range(len(ordered_traces) - 1): - current_time = ordered_traces[i].start_time - next_time = ordered_traces[i + 1].start_time - assert current_time <= next_time, f"Traces should be ordered by start_time: {current_time} > {next_time}" - - # Test limit behavior - limited = llama_stack_client.telemetry.query_traces(limit=3) - assert len(limited) == 3, "Should return exactly 3 traces when limit=3" diff --git a/tests/integration/telemetry/test_telemetry_metrics.py b/tests/integration/telemetry/test_telemetry_metrics.py deleted file mode 100644 index 1d8312ae2..000000000 --- a/tests/integration/telemetry/test_telemetry_metrics.py +++ /dev/null @@ -1,206 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import time -from datetime import UTC, datetime, timedelta - -import pytest - - -@pytest.fixture(scope="module", autouse=True) -def setup_telemetry_metrics_data(openai_client, client_with_models, text_model_id): - """Setup fixture that creates telemetry metrics data before tests run.""" - - # Skip OpenAI tests if running in library mode - if not hasattr(client_with_models, "base_url"): - pytest.skip("OpenAI client tests not supported with library client") - - prompt_tokens = [] - completion_tokens = [] - total_tokens = [] - - # Create OpenAI completions to generate metrics using the proper OpenAI client - for i in range(5): - response = openai_client.chat.completions.create( - model=text_model_id, - messages=[{"role": "user", "content": f"OpenAI test {i}"}], - stream=False, - ) - prompt_tokens.append(response.usage.prompt_tokens) - completion_tokens.append(response.usage.completion_tokens) - total_tokens.append(response.usage.total_tokens) - - # Wait for metrics to be logged - start_time = time.time() - while time.time() - start_time < 30: - try: - # Try to query metrics to see if they're available - metrics_response = client_with_models.telemetry.query_metrics( - metric_name="completion_tokens", - start_time=int((datetime.now(UTC) - timedelta(minutes=5)).timestamp()), - ) - if len(metrics_response[0].values) > 0: - break - except Exception: - pass - time.sleep(0.1) - - # Return the token lists for use in tests - return {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens} - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_prompt_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data): - """Test that prompt_tokens metrics are queryable.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - response = client_with_models.telemetry.query_metrics( - metric_name="prompt_tokens", - start_time=start_time, - ) - - assert isinstance(response, list) - - assert isinstance(response[0].values, list), "Should return a list of metric series" - - assert response[0].metric == "prompt_tokens" - - # Use the actual values from setup instead of hardcoded values - expected_values = setup_telemetry_metrics_data["prompt_tokens"] - assert response[0].values[-1].value in expected_values, ( - f"Expected one of {expected_values}, got {response[0].values[-1].value}" - ) - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_completion_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data): - """Test that completion_tokens metrics are queryable.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - response = client_with_models.telemetry.query_metrics( - metric_name="completion_tokens", - start_time=start_time, - ) - - assert isinstance(response, list) - - assert isinstance(response[0].values, list), "Should return a list of metric series" - - assert response[0].metric == "completion_tokens" - - # Use the actual values from setup instead of hardcoded values - expected_values = setup_telemetry_metrics_data["completion_tokens"] - assert response[0].values[-1].value in expected_values, ( - f"Expected one of {expected_values}, got {response[0].values[-1].value}" - ) - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_total_tokens(client_with_models, text_model_id, setup_telemetry_metrics_data): - """Test that total_tokens metrics are queryable.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - response = client_with_models.telemetry.query_metrics( - metric_name="total_tokens", - start_time=start_time, - ) - - assert isinstance(response, list) - - assert isinstance(response[0].values, list), "Should return a list of metric series" - - assert response[0].metric == "total_tokens" - - # Use the actual values from setup instead of hardcoded values - expected_values = setup_telemetry_metrics_data["total_tokens"] - assert response[0].values[-1].value in expected_values, ( - f"Expected one of {expected_values}, got {response[0].values[-1].value}" - ) - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_with_time_range(llama_stack_client, text_model_id): - """Test that metrics are queryable with time range.""" - end_time = int(datetime.now(UTC).timestamp()) - start_time = end_time - 600 # 10 minutes ago - - response = llama_stack_client.telemetry.query_metrics( - metric_name="prompt_tokens", - start_time=start_time, - end_time=end_time, - ) - - assert isinstance(response, list) - - assert isinstance(response[0].values, list), "Should return a list of metric series" - - assert response[0].metric == "prompt_tokens" - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_with_label_matchers(llama_stack_client, text_model_id): - """Test that metrics are queryable with label matchers.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - response = llama_stack_client.telemetry.query_metrics( - metric_name="prompt_tokens", - start_time=start_time, - label_matchers=[{"name": "model_id", "value": text_model_id, "operator": "="}], - ) - - assert isinstance(response[0].values, list), "Should return a list of metric series" - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_nonexistent_metric(llama_stack_client): - """Test that querying a nonexistent metric returns empty data.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - response = llama_stack_client.telemetry.query_metrics( - metric_name="nonexistent_metric", - start_time=start_time, - ) - - assert isinstance(response, list), "Should return an empty list for nonexistent metric" - assert len(response) == 0 - - -@pytest.mark.skip(reason="Skipping this test until client is regenerated") -def test_query_metrics_with_granularity(llama_stack_client, text_model_id): - """Test that metrics are queryable with different granularity levels.""" - start_time = int((datetime.now(UTC) - timedelta(minutes=10)).timestamp()) - - # Test hourly granularity - hourly_response = llama_stack_client.telemetry.query_metrics( - metric_name="total_tokens", - start_time=start_time, - granularity="1h", - ) - - # Test daily granularity - daily_response = llama_stack_client.telemetry.query_metrics( - metric_name="total_tokens", - start_time=start_time, - granularity="1d", - ) - - # Test no granularity (raw data points) - raw_response = llama_stack_client.telemetry.query_metrics( - metric_name="total_tokens", - start_time=start_time, - granularity=None, - ) - - # All should return valid data - assert isinstance(hourly_response[0].values, list), "Hourly granularity should return data" - assert isinstance(daily_response[0].values, list), "Daily granularity should return data" - assert isinstance(raw_response[0].values, list), "No granularity should return data" - - # Verify that different granularities produce different aggregation levels - # (The exact number depends on data distribution, but they should be queryable) - assert len(hourly_response[0].values) >= 0, "Hourly granularity should be queryable" - assert len(daily_response[0].values) >= 0, "Daily granularity should be queryable" - assert len(raw_response[0].values) >= 0, "No granularity should be queryable" diff --git a/uv.lock b/uv.lock index fea1d40c9..47c354d45 100644 --- a/uv.lock +++ b/uv.lock @@ -152,6 +152,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/29/5ecc3a15d5a33e31b26c11426c45c501e439cb865d0bff96315d86443b78/appnope-0.1.4-py2.py3-none-any.whl", hash = "sha256:502575ee11cd7a28c0205f379b525beefebab9d161b7c964670864014ed7213c", size = 4321, upload-time = "2024-02-06T09:43:09.663Z" }, ] +[[package]] +name = "asgiref" +version = "3.10.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/46/08/4dfec9b90758a59acc6be32ac82e98d1fbfc321cb5cfa410436dbacf821c/asgiref-3.10.0.tar.gz", hash = "sha256:d89f2d8cd8b56dada7d52fa7dc8075baa08fb836560710d38c292a7a3f78c04e", size = 37483, upload-time = "2025-10-05T09:15:06.557Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/17/9c/fc2331f538fbf7eedba64b2052e99ccf9ba9d6888e2f41441ee28847004b/asgiref-3.10.0-py3-none-any.whl", hash = "sha256:aef8a81283a34d0ab31630c9b7dfe70c812c95eba78171367ca8745e88124734", size = 24050, upload-time = "2025-10-05T09:15:05.11Z" }, +] + [[package]] name = "asttokens" version = "3.0.0" @@ -1765,6 +1774,7 @@ dependencies = [ { name = "llama-stack-client" }, { name = "openai" }, { name = "opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-instrumentation-fastapi" }, { name = "opentelemetry-sdk" }, { name = "pillow" }, { name = "prompt-toolkit" }, @@ -1891,6 +1901,7 @@ requires-dist = [ { name = "llama-stack-client", marker = "extra == 'ui'", specifier = ">=0.2.23" }, { name = "openai", specifier = ">=1.107" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.30.0" }, + { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.57b0" }, { name = "opentelemetry-sdk", specifier = ">=1.30.0" }, { name = "pandas", marker = "extra == 'ui'" }, { name = "pillow" }, @@ -2698,6 +2709,53 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7f/41/a680d38b34f8f5ddbd78ed9f0042e1cc712d58ec7531924d71cb1e6c629d/opentelemetry_exporter_otlp_proto_http-1.36.0-py3-none-any.whl", hash = "sha256:3d769f68e2267e7abe4527f70deb6f598f40be3ea34c6adc35789bea94a32902", size = 18752, upload-time = "2025-07-29T15:11:53.164Z" }, ] +[[package]] +name = "opentelemetry-instrumentation" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/12/37/cf17cf28f945a3aca5a038cfbb45ee01317d4f7f3a0e5209920883fe9b08/opentelemetry_instrumentation-0.57b0.tar.gz", hash = "sha256:f2a30135ba77cdea2b0e1df272f4163c154e978f57214795d72f40befd4fcf05", size = 30807, upload-time = "2025-07-29T15:42:44.746Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/6f/f20cd1542959f43fb26a5bf9bb18cd81a1ea0700e8870c8f369bd07f5c65/opentelemetry_instrumentation-0.57b0-py3-none-any.whl", hash = "sha256:9109280f44882e07cec2850db28210b90600ae9110b42824d196de357cbddf7e", size = 32460, upload-time = "2025-07-29T15:41:40.883Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/97/10/7ba59b586eb099fa0155521b387d857de476687c670096597f618d889323/opentelemetry_instrumentation_asgi-0.57b0.tar.gz", hash = "sha256:a6f880b5d1838f65688fc992c65fbb1d3571f319d370990c32e759d3160e510b", size = 24654, upload-time = "2025-07-29T15:42:48.199Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9e/07/ab97dd7e8bc680b479203f7d3b2771b7a097468135a669a38da3208f96cb/opentelemetry_instrumentation_asgi-0.57b0-py3-none-any.whl", hash = "sha256:47debbde6af066a7e8e911f7193730d5e40d62effc1ac2e1119908347790a3ea", size = 16599, upload-time = "2025-07-29T15:41:48.332Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-asgi" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/47/a8/7c22a33ff5986523a7f9afcb5f4d749533842c3cc77ef55b46727580edd0/opentelemetry_instrumentation_fastapi-0.57b0.tar.gz", hash = "sha256:73ac22f3c472a8f9cb21d1fbe5a4bf2797690c295fff4a1c040e9b1b1688a105", size = 20277, upload-time = "2025-07-29T15:42:58.68Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3b/df/f20fc21c88c7af5311bfefc15fc4e606bab5edb7c193aa8c73c354904c35/opentelemetry_instrumentation_fastapi-0.57b0-py3-none-any.whl", hash = "sha256:61e6402749ffe0bfec582e58155e0d81dd38723cd9bc4562bca1acca80334006", size = 12712, upload-time = "2025-07-29T15:42:03.332Z" }, +] + [[package]] name = "opentelemetry-proto" version = "1.36.0" @@ -2737,6 +2795,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/05/75/7d591371c6c39c73de5ce5da5a2cc7b72d1d1cd3f8f4638f553c01c37b11/opentelemetry_semantic_conventions-0.57b0-py3-none-any.whl", hash = "sha256:757f7e76293294f124c827e514c2a3144f191ef175b069ce8d1211e1e38e9e78", size = 201627, upload-time = "2025-07-29T15:12:04.174Z" }, ] +[[package]] +name = "opentelemetry-util-http" +version = "0.57b0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/9b/1b/6229c45445e08e798fa825f5376f6d6a4211d29052a4088eed6d577fa653/opentelemetry_util_http-0.57b0.tar.gz", hash = "sha256:f7417595ead0eb42ed1863ec9b2f839fc740368cd7bbbfc1d0a47bc1ab0aba11", size = 9405, upload-time = "2025-07-29T15:43:19.916Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0b/a6/b98d508d189b9c208f5978d0906141747d7e6df7c7cafec03657ed1ed559/opentelemetry_util_http-0.57b0-py3-none-any.whl", hash = "sha256:e54c0df5543951e471c3d694f85474977cd5765a3b7654398c83bab3d2ffb8e9", size = 7643, upload-time = "2025-07-29T15:42:41.744Z" }, +] + [[package]] name = "orjson" version = "3.11.1" @@ -5249,6 +5316,55 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/24/ab44c871b0f07f491e5d2ad12c9bd7358e527510618cb1b803a88e986db1/werkzeug-3.1.3-py3-none-any.whl", hash = "sha256:54b78bf3716d19a65be4fceccc0d1d7b89e608834989dfae50ea87564639213e", size = 224498, upload-time = "2024-11-08T15:52:16.132Z" }, ] +[[package]] +name = "wrapt" +version = "1.17.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/8f/aeb76c5b46e273670962298c23e7ddde79916cb74db802131d49a85e4b7d/wrapt-1.17.3.tar.gz", hash = "sha256:f66eb08feaa410fe4eebd17f2a2c8e2e46d3476e9f8c783daa8e09e0faa666d0", size = 55547, upload-time = "2025-08-12T05:53:21.714Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9f/41/cad1aba93e752f1f9268c77270da3c469883d56e2798e7df6240dcb2287b/wrapt-1.17.3-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:ab232e7fdb44cdfbf55fc3afa31bcdb0d8980b9b95c38b6405df2acb672af0e0", size = 53998, upload-time = "2025-08-12T05:51:47.138Z" }, + { url = "https://files.pythonhosted.org/packages/60/f8/096a7cc13097a1869fe44efe68dace40d2a16ecb853141394047f0780b96/wrapt-1.17.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:9baa544e6acc91130e926e8c802a17f3b16fbea0fd441b5a60f5cf2cc5c3deba", size = 39020, upload-time = "2025-08-12T05:51:35.906Z" }, + { url = "https://files.pythonhosted.org/packages/33/df/bdf864b8997aab4febb96a9ae5c124f700a5abd9b5e13d2a3214ec4be705/wrapt-1.17.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6b538e31eca1a7ea4605e44f81a48aa24c4632a277431a6ed3f328835901f4fd", size = 39098, upload-time = "2025-08-12T05:51:57.474Z" }, + { url = "https://files.pythonhosted.org/packages/9f/81/5d931d78d0eb732b95dc3ddaeeb71c8bb572fb01356e9133916cd729ecdd/wrapt-1.17.3-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:042ec3bb8f319c147b1301f2393bc19dba6e176b7da446853406d041c36c7828", size = 88036, upload-time = "2025-08-12T05:52:34.784Z" }, + { url = "https://files.pythonhosted.org/packages/ca/38/2e1785df03b3d72d34fc6252d91d9d12dc27a5c89caef3335a1bbb8908ca/wrapt-1.17.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3af60380ba0b7b5aeb329bc4e402acd25bd877e98b3727b0135cb5c2efdaefe9", size = 88156, upload-time = "2025-08-12T05:52:13.599Z" }, + { url = "https://files.pythonhosted.org/packages/b3/8b/48cdb60fe0603e34e05cffda0b2a4adab81fd43718e11111a4b0100fd7c1/wrapt-1.17.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0b02e424deef65c9f7326d8c19220a2c9040c51dc165cddb732f16198c168396", size = 87102, upload-time = "2025-08-12T05:52:14.56Z" }, + { url = "https://files.pythonhosted.org/packages/3c/51/d81abca783b58f40a154f1b2c56db1d2d9e0d04fa2d4224e357529f57a57/wrapt-1.17.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:74afa28374a3c3a11b3b5e5fca0ae03bef8450d6aa3ab3a1e2c30e3a75d023dc", size = 87732, upload-time = "2025-08-12T05:52:36.165Z" }, + { url = "https://files.pythonhosted.org/packages/9e/b1/43b286ca1392a006d5336412d41663eeef1ad57485f3e52c767376ba7e5a/wrapt-1.17.3-cp312-cp312-win32.whl", hash = "sha256:4da9f45279fff3543c371d5ababc57a0384f70be244de7759c85a7f989cb4ebe", size = 36705, upload-time = "2025-08-12T05:53:07.123Z" }, + { url = "https://files.pythonhosted.org/packages/28/de/49493f962bd3c586ab4b88066e967aa2e0703d6ef2c43aa28cb83bf7b507/wrapt-1.17.3-cp312-cp312-win_amd64.whl", hash = "sha256:e71d5c6ebac14875668a1e90baf2ea0ef5b7ac7918355850c0908ae82bcb297c", size = 38877, upload-time = "2025-08-12T05:53:05.436Z" }, + { url = "https://files.pythonhosted.org/packages/f1/48/0f7102fe9cb1e8a5a77f80d4f0956d62d97034bbe88d33e94699f99d181d/wrapt-1.17.3-cp312-cp312-win_arm64.whl", hash = "sha256:604d076c55e2fdd4c1c03d06dc1a31b95130010517b5019db15365ec4a405fc6", size = 36885, upload-time = "2025-08-12T05:52:54.367Z" }, + { url = "https://files.pythonhosted.org/packages/fc/f6/759ece88472157acb55fc195e5b116e06730f1b651b5b314c66291729193/wrapt-1.17.3-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:a47681378a0439215912ef542c45a783484d4dd82bac412b71e59cf9c0e1cea0", size = 54003, upload-time = "2025-08-12T05:51:48.627Z" }, + { url = "https://files.pythonhosted.org/packages/4f/a9/49940b9dc6d47027dc850c116d79b4155f15c08547d04db0f07121499347/wrapt-1.17.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:54a30837587c6ee3cd1a4d1c2ec5d24e77984d44e2f34547e2323ddb4e22eb77", size = 39025, upload-time = "2025-08-12T05:51:37.156Z" }, + { url = "https://files.pythonhosted.org/packages/45/35/6a08de0f2c96dcdd7fe464d7420ddb9a7655a6561150e5fc4da9356aeaab/wrapt-1.17.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:16ecf15d6af39246fe33e507105d67e4b81d8f8d2c6598ff7e3ca1b8a37213f7", size = 39108, upload-time = "2025-08-12T05:51:58.425Z" }, + { url = "https://files.pythonhosted.org/packages/0c/37/6faf15cfa41bf1f3dba80cd3f5ccc6622dfccb660ab26ed79f0178c7497f/wrapt-1.17.3-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:6fd1ad24dc235e4ab88cda009e19bf347aabb975e44fd5c2fb22a3f6e4141277", size = 88072, upload-time = "2025-08-12T05:52:37.53Z" }, + { url = "https://files.pythonhosted.org/packages/78/f2/efe19ada4a38e4e15b6dff39c3e3f3f73f5decf901f66e6f72fe79623a06/wrapt-1.17.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ed61b7c2d49cee3c027372df5809a59d60cf1b6c2f81ee980a091f3afed6a2d", size = 88214, upload-time = "2025-08-12T05:52:15.886Z" }, + { url = "https://files.pythonhosted.org/packages/40/90/ca86701e9de1622b16e09689fc24b76f69b06bb0150990f6f4e8b0eeb576/wrapt-1.17.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:423ed5420ad5f5529db9ce89eac09c8a2f97da18eb1c870237e84c5a5c2d60aa", size = 87105, upload-time = "2025-08-12T05:52:17.914Z" }, + { url = "https://files.pythonhosted.org/packages/fd/e0/d10bd257c9a3e15cbf5523025252cc14d77468e8ed644aafb2d6f54cb95d/wrapt-1.17.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:e01375f275f010fcbf7f643b4279896d04e571889b8a5b3f848423d91bf07050", size = 87766, upload-time = "2025-08-12T05:52:39.243Z" }, + { url = "https://files.pythonhosted.org/packages/e8/cf/7d848740203c7b4b27eb55dbfede11aca974a51c3d894f6cc4b865f42f58/wrapt-1.17.3-cp313-cp313-win32.whl", hash = "sha256:53e5e39ff71b3fc484df8a522c933ea2b7cdd0d5d15ae82e5b23fde87d44cbd8", size = 36711, upload-time = "2025-08-12T05:53:10.074Z" }, + { url = "https://files.pythonhosted.org/packages/57/54/35a84d0a4d23ea675994104e667ceff49227ce473ba6a59ba2c84f250b74/wrapt-1.17.3-cp313-cp313-win_amd64.whl", hash = "sha256:1f0b2f40cf341ee8cc1a97d51ff50dddb9fcc73241b9143ec74b30fc4f44f6cb", size = 38885, upload-time = "2025-08-12T05:53:08.695Z" }, + { url = "https://files.pythonhosted.org/packages/01/77/66e54407c59d7b02a3c4e0af3783168fff8e5d61def52cda8728439d86bc/wrapt-1.17.3-cp313-cp313-win_arm64.whl", hash = "sha256:7425ac3c54430f5fc5e7b6f41d41e704db073309acfc09305816bc6a0b26bb16", size = 36896, upload-time = "2025-08-12T05:52:55.34Z" }, + { url = "https://files.pythonhosted.org/packages/02/a2/cd864b2a14f20d14f4c496fab97802001560f9f41554eef6df201cd7f76c/wrapt-1.17.3-cp314-cp314-macosx_10_13_universal2.whl", hash = "sha256:cf30f6e3c077c8e6a9a7809c94551203c8843e74ba0c960f4a98cd80d4665d39", size = 54132, upload-time = "2025-08-12T05:51:49.864Z" }, + { url = "https://files.pythonhosted.org/packages/d5/46/d011725b0c89e853dc44cceb738a307cde5d240d023d6d40a82d1b4e1182/wrapt-1.17.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:e228514a06843cae89621384cfe3a80418f3c04aadf8a3b14e46a7be704e4235", size = 39091, upload-time = "2025-08-12T05:51:38.935Z" }, + { url = "https://files.pythonhosted.org/packages/2e/9e/3ad852d77c35aae7ddebdbc3b6d35ec8013af7d7dddad0ad911f3d891dae/wrapt-1.17.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:5ea5eb3c0c071862997d6f3e02af1d055f381b1d25b286b9d6644b79db77657c", size = 39172, upload-time = "2025-08-12T05:51:59.365Z" }, + { url = "https://files.pythonhosted.org/packages/c3/f7/c983d2762bcce2326c317c26a6a1e7016f7eb039c27cdf5c4e30f4160f31/wrapt-1.17.3-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:281262213373b6d5e4bb4353bc36d1ba4084e6d6b5d242863721ef2bf2c2930b", size = 87163, upload-time = "2025-08-12T05:52:40.965Z" }, + { url = "https://files.pythonhosted.org/packages/e4/0f/f673f75d489c7f22d17fe0193e84b41540d962f75fce579cf6873167c29b/wrapt-1.17.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:dc4a8d2b25efb6681ecacad42fca8859f88092d8732b170de6a5dddd80a1c8fa", size = 87963, upload-time = "2025-08-12T05:52:20.326Z" }, + { url = "https://files.pythonhosted.org/packages/df/61/515ad6caca68995da2fac7a6af97faab8f78ebe3bf4f761e1b77efbc47b5/wrapt-1.17.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:373342dd05b1d07d752cecbec0c41817231f29f3a89aa8b8843f7b95992ed0c7", size = 86945, upload-time = "2025-08-12T05:52:21.581Z" }, + { url = "https://files.pythonhosted.org/packages/d3/bd/4e70162ce398462a467bc09e768bee112f1412e563620adc353de9055d33/wrapt-1.17.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d40770d7c0fd5cbed9d84b2c3f2e156431a12c9a37dc6284060fb4bec0b7ffd4", size = 86857, upload-time = "2025-08-12T05:52:43.043Z" }, + { url = "https://files.pythonhosted.org/packages/2b/b8/da8560695e9284810b8d3df8a19396a6e40e7518059584a1a394a2b35e0a/wrapt-1.17.3-cp314-cp314-win32.whl", hash = "sha256:fbd3c8319de8e1dc79d346929cd71d523622da527cca14e0c1d257e31c2b8b10", size = 37178, upload-time = "2025-08-12T05:53:12.605Z" }, + { url = "https://files.pythonhosted.org/packages/db/c8/b71eeb192c440d67a5a0449aaee2310a1a1e8eca41676046f99ed2487e9f/wrapt-1.17.3-cp314-cp314-win_amd64.whl", hash = "sha256:e1a4120ae5705f673727d3253de3ed0e016f7cd78dc463db1b31e2463e1f3cf6", size = 39310, upload-time = "2025-08-12T05:53:11.106Z" }, + { url = "https://files.pythonhosted.org/packages/45/20/2cda20fd4865fa40f86f6c46ed37a2a8356a7a2fde0773269311f2af56c7/wrapt-1.17.3-cp314-cp314-win_arm64.whl", hash = "sha256:507553480670cab08a800b9463bdb881b2edeed77dc677b0a5915e6106e91a58", size = 37266, upload-time = "2025-08-12T05:52:56.531Z" }, + { url = "https://files.pythonhosted.org/packages/77/ed/dd5cf21aec36c80443c6f900449260b80e2a65cf963668eaef3b9accce36/wrapt-1.17.3-cp314-cp314t-macosx_10_13_universal2.whl", hash = "sha256:ed7c635ae45cfbc1a7371f708727bf74690daedc49b4dba310590ca0bd28aa8a", size = 56544, upload-time = "2025-08-12T05:51:51.109Z" }, + { url = "https://files.pythonhosted.org/packages/8d/96/450c651cc753877ad100c7949ab4d2e2ecc4d97157e00fa8f45df682456a/wrapt-1.17.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:249f88ed15503f6492a71f01442abddd73856a0032ae860de6d75ca62eed8067", size = 40283, upload-time = "2025-08-12T05:51:39.912Z" }, + { url = "https://files.pythonhosted.org/packages/d1/86/2fcad95994d9b572db57632acb6f900695a648c3e063f2cd344b3f5c5a37/wrapt-1.17.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:5a03a38adec8066d5a37bea22f2ba6bbf39fcdefbe2d91419ab864c3fb515454", size = 40366, upload-time = "2025-08-12T05:52:00.693Z" }, + { url = "https://files.pythonhosted.org/packages/64/0e/f4472f2fdde2d4617975144311f8800ef73677a159be7fe61fa50997d6c0/wrapt-1.17.3-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:5d4478d72eb61c36e5b446e375bbc49ed002430d17cdec3cecb36993398e1a9e", size = 108571, upload-time = "2025-08-12T05:52:44.521Z" }, + { url = "https://files.pythonhosted.org/packages/cc/01/9b85a99996b0a97c8a17484684f206cbb6ba73c1ce6890ac668bcf3838fb/wrapt-1.17.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:223db574bb38637e8230eb14b185565023ab624474df94d2af18f1cdb625216f", size = 113094, upload-time = "2025-08-12T05:52:22.618Z" }, + { url = "https://files.pythonhosted.org/packages/25/02/78926c1efddcc7b3aa0bc3d6b33a822f7d898059f7cd9ace8c8318e559ef/wrapt-1.17.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e405adefb53a435f01efa7ccdec012c016b5a1d3f35459990afc39b6be4d5056", size = 110659, upload-time = "2025-08-12T05:52:24.057Z" }, + { url = "https://files.pythonhosted.org/packages/dc/ee/c414501ad518ac3e6fe184753632fe5e5ecacdcf0effc23f31c1e4f7bfcf/wrapt-1.17.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:88547535b787a6c9ce4086917b6e1d291aa8ed914fdd3a838b3539dc95c12804", size = 106946, upload-time = "2025-08-12T05:52:45.976Z" }, + { url = "https://files.pythonhosted.org/packages/be/44/a1bd64b723d13bb151d6cc91b986146a1952385e0392a78567e12149c7b4/wrapt-1.17.3-cp314-cp314t-win32.whl", hash = "sha256:41b1d2bc74c2cac6f9074df52b2efbef2b30bdfe5f40cb78f8ca22963bc62977", size = 38717, upload-time = "2025-08-12T05:53:15.214Z" }, + { url = "https://files.pythonhosted.org/packages/79/d9/7cfd5a312760ac4dd8bf0184a6ee9e43c33e47f3dadc303032ce012b8fa3/wrapt-1.17.3-cp314-cp314t-win_amd64.whl", hash = "sha256:73d496de46cd2cdbdbcce4ae4bcdb4afb6a11234a1df9c085249d55166b95116", size = 41334, upload-time = "2025-08-12T05:53:14.178Z" }, + { url = "https://files.pythonhosted.org/packages/46/78/10ad9781128ed2f99dbc474f43283b13fea8ba58723e98844367531c18e9/wrapt-1.17.3-cp314-cp314t-win_arm64.whl", hash = "sha256:f38e60678850c42461d4202739f9bf1e3a737c7ad283638251e79cc49effb6b6", size = 38471, upload-time = "2025-08-12T05:52:57.784Z" }, + { url = "https://files.pythonhosted.org/packages/1f/f6/a933bd70f98e9cf3e08167fc5cd7aaaca49147e48411c0bd5ae701bb2194/wrapt-1.17.3-py3-none-any.whl", hash = "sha256:7171ae35d2c33d326ac19dd8facb1e82e5fd04ef8c6c0e394d7af55a55051c22", size = 23591, upload-time = "2025-08-12T05:53:20.674Z" }, +] + [[package]] name = "wsproto" version = "1.2.0"