diff --git a/docs/_static/llama-stack-spec.html b/docs/_static/llama-stack-spec.html index e2c53d4b0..722c30fd6 100644 --- a/docs/_static/llama-stack-spec.html +++ b/docs/_static/llama-stack-spec.html @@ -15885,12 +15885,16 @@ "value": { "type": "number", "description": "The numeric value of the metric at this timestamp" + }, + "unit": { + "type": "string" } }, "additionalProperties": false, "required": [ "timestamp", - "value" + "value", + "unit" ], "title": "MetricDataPoint", "description": "A single data point in a metric time series." diff --git a/docs/_static/llama-stack-spec.yaml b/docs/_static/llama-stack-spec.yaml index 85cec3a78..e54d7c971 100644 --- a/docs/_static/llama-stack-spec.yaml +++ b/docs/_static/llama-stack-spec.yaml @@ -11810,10 +11810,13 @@ components: type: number description: >- The numeric value of the metric at this timestamp + unit: + type: string additionalProperties: false required: - timestamp - value + - unit title: MetricDataPoint description: >- A single data point in a metric time series. diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index 92422ac1b..8d1b5d697 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -386,6 +386,7 @@ class MetricDataPoint(BaseModel): timestamp: int value: float + unit: str @json_schema_type @@ -518,7 +519,7 @@ class Telemetry(Protocol): metric_name: str, start_time: int, end_time: int | None = None, - granularity: str | None = "1d", + granularity: str | None = None, query_type: MetricQueryType = MetricQueryType.RANGE, label_matchers: list[MetricLabelMatcher] | None = None, ) -> QueryMetricsResponse: diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index d99255c79..cdd2b9f9a 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -4,6 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import datetime import logging import threading from typing import Any @@ -145,11 +146,41 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): metric_name: str, start_time: int, end_time: int | None = None, - granularity: str | None = "1d", + granularity: str | None = None, query_type: MetricQueryType = MetricQueryType.RANGE, label_matchers: list[MetricLabelMatcher] | None = None, ) -> QueryMetricsResponse: - raise NotImplementedError("Querying metrics is not implemented") + """Query metrics from the telemetry store. + + Args: + metric_name: The name of the metric to query (e.g., "prompt_tokens") + start_time: Start time as Unix timestamp + end_time: End time as Unix timestamp (defaults to now if None) + granularity: Time granularity for aggregation + query_type: Type of query (RANGE or INSTANT) + label_matchers: Label filters to apply + + Returns: + QueryMetricsResponse with metric time series data + """ + # Convert timestamps to datetime objects + start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC) + end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None + + # Use SQLite trace store if available + if hasattr(self, "trace_store") and self.trace_store: + return await self.trace_store.query_metrics( + metric_name=metric_name, + start_time=start_dt, + end_time=end_dt, + granularity=granularity, + query_type=query_type, + label_matchers=label_matchers, + ) + else: + raise ValueError( + f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks" + ) def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None: with self._lock: diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py index 8dd6061a6..71480364c 100644 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py @@ -5,12 +5,23 @@ # the root directory of this source tree. import json -from datetime import datetime +from datetime import UTC, datetime from typing import Protocol import aiosqlite -from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace +from llama_stack.apis.telemetry import ( + MetricDataPoint, + MetricLabel, + MetricLabelMatcher, + MetricQueryType, + MetricSeries, + QueryCondition, + QueryMetricsResponse, + Span, + SpanWithStatus, + Trace, +) class TraceStore(Protocol): @@ -29,11 +40,192 @@ class TraceStore(Protocol): max_depth: int | None = None, ) -> dict[str, SpanWithStatus]: ... + async def query_metrics( + self, + metric_name: str, + start_time: datetime, + end_time: datetime | None = None, + granularity: str | None = "1d", + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: list[MetricLabelMatcher] | None = None, + ) -> QueryMetricsResponse: ... + class SQLiteTraceStore(TraceStore): def __init__(self, conn_string: str): self.conn_string = conn_string + async def query_metrics( + self, + metric_name: str, + start_time: datetime, + end_time: datetime | None = None, + granularity: str | None = None, + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: list[MetricLabelMatcher] | None = None, + ) -> QueryMetricsResponse: + if end_time is None: + end_time = datetime.now(UTC) + + # Build base query + if query_type == MetricQueryType.INSTANT: + query = """ + SELECT + se.name, + SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, + json_extract(se.attributes, '$.unit') as unit, + se.attributes + FROM span_events se + WHERE se.name = ? + AND se.timestamp BETWEEN ? AND ? + """ + else: + if granularity: + time_format = self._get_time_format_for_granularity(granularity) + query = f""" + SELECT + se.name, + SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value, + json_extract(se.attributes, '$.unit') as unit, + se.attributes, + strftime('{time_format}', se.timestamp) as bucket_start + FROM span_events se + WHERE se.name = ? + AND se.timestamp BETWEEN ? AND ? + """ + else: + query = """ + SELECT + se.name, + json_extract(se.attributes, '$.value') as value, + json_extract(se.attributes, '$.unit') as unit, + se.attributes, + se.timestamp + FROM span_events se + WHERE se.name = ? + AND se.timestamp BETWEEN ? AND ? + """ + + params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()] + + # Labels that will be attached to the MetricSeries (preserve matcher labels) + all_labels: list[MetricLabel] = [] + matcher_label_names = set() + if label_matchers: + for matcher in label_matchers: + json_path = f"$.{matcher.name}" + if matcher.operator == "=": + query += f" AND json_extract(se.attributes, '{json_path}') = ?" + params.append(matcher.value) + elif matcher.operator == "!=": + query += f" AND json_extract(se.attributes, '{json_path}') != ?" + params.append(matcher.value) + elif matcher.operator == "=~": + query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?" + params.append(f"%{matcher.value}%") + elif matcher.operator == "!~": + query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?" + params.append(f"%{matcher.value}%") + # Preserve filter context in output + all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value))) + matcher_label_names.add(matcher.name) + + # GROUP BY / ORDER BY logic + if query_type == MetricQueryType.RANGE and granularity: + group_time_format = self._get_time_format_for_granularity(granularity) + query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')" + query += " ORDER BY bucket_start" + elif query_type == MetricQueryType.INSTANT: + query += " GROUP BY json_extract(se.attributes, '$.unit')" + else: + query += " ORDER BY se.timestamp" + + # Execute query + async with aiosqlite.connect(self.conn_string) as conn: + conn.row_factory = aiosqlite.Row + async with conn.execute(query, params) as cursor: + rows = await cursor.fetchall() + + if not rows: + return QueryMetricsResponse(data=[]) + + data_points = [] + # We want to add attribute labels, but only those not already present as matcher labels. + attr_label_names = set() + for row in rows: + # Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result. + try: + attributes = json.loads(row["attributes"] or "{}") + except (TypeError, json.JSONDecodeError): + attributes = {} + + value = row["value"] + unit = row["unit"] or "" + + # Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result. + for k, v in attributes.items(): + if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names: + all_labels.append(MetricLabel(name=k, value=str(v))) + attr_label_names.add(k) + + # Determine timestamp + if query_type == MetricQueryType.RANGE and granularity: + try: + bucket_start_raw = row["bucket_start"] + except KeyError as e: + raise ValueError( + "DB did not have a bucket_start time in row when using granularity, this indicates improper formatting" + ) from e + # this value could also be there, but be NULL, I think. + if bucket_start_raw is None: + raise ValueError("bucket_start is None check time format and data") + bucket_start = datetime.fromisoformat(bucket_start_raw) + timestamp = int(bucket_start.timestamp()) + elif query_type == MetricQueryType.INSTANT: + timestamp = int(datetime.now(UTC).timestamp()) + else: + try: + timestamp_raw = row["timestamp"] + except KeyError as e: + raise ValueError( + "DB did not have a timestamp in row, this indicates improper formatting" + ) from e + # this value could also be there, but be NULL, I think. + if timestamp_raw is None: + raise ValueError("timestamp is None check time format and data") + timestamp_iso = datetime.fromisoformat(timestamp_raw) + timestamp = int(timestamp_iso.timestamp()) + + data_points.append( + MetricDataPoint( + timestamp=timestamp, + value=value, + unit=unit, + ) + ) + + metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)] + return QueryMetricsResponse(data=metric_series) + + def _get_time_format_for_granularity(self, granularity: str | None) -> str: + """Get the SQLite strftime format string for a given granularity. + Args: + granularity: Granularity string (e.g., "1m", "5m", "1h", "1d") + Returns: + SQLite strftime format string for the granularity + """ + if granularity is None: + raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation") + + if granularity.endswith("d"): + return "%Y-%m-%d 00:00:00" + elif granularity.endswith("h"): + return "%Y-%m-%d %H:00:00" + elif granularity.endswith("m"): + return "%Y-%m-%d %H:%M:00" + else: + return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps. + async def query_traces( self, attribute_filters: list[QueryCondition] | None = None,