feat: implement query_metrics

query_metrics currently has no implementation, meaning once a metric is emitted there is no way in llama stack to query it from the store.

implement query_metrics for the meta_reference provider which follows a similar style to `query_traces`, using the trace_store to format an SQL query and execute it

in this case the parameters for the query are `metric.METRIC_NAME, start_time, and end_time`.

this required client side changes since the client had no `query_metrics` or any associated resources, so any tests here will fail but I will provider manual execution logs for the new tests I am adding

order the metrics by timestamp.

Additionally add `unit` to the `MetricDataPoint` class since this adds much more context to the metric being queried.

these metrics can also be aggregated via a `granularity` parameter. This was pre-defined as a string like: `1m, 1h, 1d` where metrics occuring in same timespan specified are aggregated together.

Signed-off-by: Charlie Doern <cdoern@redhat.com>
This commit is contained in:
Charlie Doern 2025-08-07 20:03:58 -04:00
parent 0cbd93c5cc
commit c1a9a21380
5 changed files with 237 additions and 6 deletions

View file

@ -15885,12 +15885,16 @@
"value": { "value": {
"type": "number", "type": "number",
"description": "The numeric value of the metric at this timestamp" "description": "The numeric value of the metric at this timestamp"
},
"unit": {
"type": "string"
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": [ "required": [
"timestamp", "timestamp",
"value" "value",
"unit"
], ],
"title": "MetricDataPoint", "title": "MetricDataPoint",
"description": "A single data point in a metric time series." "description": "A single data point in a metric time series."

View file

@ -11810,10 +11810,13 @@ components:
type: number type: number
description: >- description: >-
The numeric value of the metric at this timestamp The numeric value of the metric at this timestamp
unit:
type: string
additionalProperties: false additionalProperties: false
required: required:
- timestamp - timestamp
- value - value
- unit
title: MetricDataPoint title: MetricDataPoint
description: >- description: >-
A single data point in a metric time series. A single data point in a metric time series.

View file

@ -386,6 +386,7 @@ class MetricDataPoint(BaseModel):
timestamp: int timestamp: int
value: float value: float
unit: str
@json_schema_type @json_schema_type
@ -518,7 +519,7 @@ class Telemetry(Protocol):
metric_name: str, metric_name: str,
start_time: int, start_time: int,
end_time: int | None = None, end_time: int | None = None,
granularity: str | None = "1d", granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE, query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None, label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ) -> QueryMetricsResponse:

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import datetime
import logging import logging
import threading import threading
from typing import Any from typing import Any
@ -145,11 +146,41 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
metric_name: str, metric_name: str,
start_time: int, start_time: int,
end_time: int | None = None, end_time: int | None = None,
granularity: str | None = "1d", granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE, query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None, label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ) -> QueryMetricsResponse:
raise NotImplementedError("Querying metrics is not implemented") """Query metrics from the telemetry store.
Args:
metric_name: The name of the metric to query (e.g., "prompt_tokens")
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp (defaults to now if None)
granularity: Time granularity for aggregation
query_type: Type of query (RANGE or INSTANT)
label_matchers: Label filters to apply
Returns:
QueryMetricsResponse with metric time series data
"""
# Convert timestamps to datetime objects
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
# Use SQLite trace store if available
if hasattr(self, "trace_store") and self.trace_store:
return await self.trace_store.query_metrics(
metric_name=metric_name,
start_time=start_dt,
end_time=end_dt,
granularity=granularity,
query_type=query_type,
label_matchers=label_matchers,
)
else:
raise ValueError(
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
)
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None: def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
with self._lock: with self._lock:

View file

@ -5,12 +5,23 @@
# the root directory of this source tree. # the root directory of this source tree.
import json import json
from datetime import datetime from datetime import UTC, datetime
from typing import Protocol from typing import Protocol
import aiosqlite import aiosqlite
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace from llama_stack.apis.telemetry import (
MetricDataPoint,
MetricLabel,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
QueryCondition,
QueryMetricsResponse,
Span,
SpanWithStatus,
Trace,
)
class TraceStore(Protocol): class TraceStore(Protocol):
@ -29,11 +40,192 @@ class TraceStore(Protocol):
max_depth: int | None = None, max_depth: int | None = None,
) -> dict[str, SpanWithStatus]: ... ) -> dict[str, SpanWithStatus]: ...
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = "1d",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse: ...
class SQLiteTraceStore(TraceStore): class SQLiteTraceStore(TraceStore):
def __init__(self, conn_string: str): def __init__(self, conn_string: str):
self.conn_string = conn_string self.conn_string = conn_string
async def query_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: datetime | None = None,
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
if end_time is None:
end_time = datetime.now(UTC)
# Build base query
if query_type == MetricQueryType.INSTANT:
query = """
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
if granularity:
time_format = self._get_time_format_for_granularity(granularity)
query = f"""
SELECT
se.name,
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
strftime('{time_format}', se.timestamp) as bucket_start
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
else:
query = """
SELECT
se.name,
json_extract(se.attributes, '$.value') as value,
json_extract(se.attributes, '$.unit') as unit,
se.attributes,
se.timestamp
FROM span_events se
WHERE se.name = ?
AND se.timestamp BETWEEN ? AND ?
"""
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
# Labels that will be attached to the MetricSeries (preserve matcher labels)
all_labels: list[MetricLabel] = []
matcher_label_names = set()
if label_matchers:
for matcher in label_matchers:
json_path = f"$.{matcher.name}"
if matcher.operator == "=":
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
params.append(matcher.value)
elif matcher.operator == "!=":
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
params.append(matcher.value)
elif matcher.operator == "=~":
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
params.append(f"%{matcher.value}%")
elif matcher.operator == "!~":
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
params.append(f"%{matcher.value}%")
# Preserve filter context in output
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
matcher_label_names.add(matcher.name)
# GROUP BY / ORDER BY logic
if query_type == MetricQueryType.RANGE and granularity:
group_time_format = self._get_time_format_for_granularity(granularity)
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
query += " ORDER BY bucket_start"
elif query_type == MetricQueryType.INSTANT:
query += " GROUP BY json_extract(se.attributes, '$.unit')"
else:
query += " ORDER BY se.timestamp"
# Execute query
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
if not rows:
return QueryMetricsResponse(data=[])
data_points = []
# We want to add attribute labels, but only those not already present as matcher labels.
attr_label_names = set()
for row in rows:
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
try:
attributes = json.loads(row["attributes"] or "{}")
except (TypeError, json.JSONDecodeError):
attributes = {}
value = row["value"]
unit = row["unit"] or ""
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
for k, v in attributes.items():
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
all_labels.append(MetricLabel(name=k, value=str(v)))
attr_label_names.add(k)
# Determine timestamp
if query_type == MetricQueryType.RANGE and granularity:
try:
bucket_start_raw = row["bucket_start"]
except KeyError as e:
raise ValueError(
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if bucket_start_raw is None:
raise ValueError("bucket_start is None check time format and data")
bucket_start = datetime.fromisoformat(bucket_start_raw)
timestamp = int(bucket_start.timestamp())
elif query_type == MetricQueryType.INSTANT:
timestamp = int(datetime.now(UTC).timestamp())
else:
try:
timestamp_raw = row["timestamp"]
except KeyError as e:
raise ValueError(
"DB did not have a timestamp in row, this indicates improper formatting"
) from e
# this value could also be there, but be NULL, I think.
if timestamp_raw is None:
raise ValueError("timestamp is None check time format and data")
timestamp_iso = datetime.fromisoformat(timestamp_raw)
timestamp = int(timestamp_iso.timestamp())
data_points.append(
MetricDataPoint(
timestamp=timestamp,
value=value,
unit=unit,
)
)
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
return QueryMetricsResponse(data=metric_series)
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
"""Get the SQLite strftime format string for a given granularity.
Args:
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
Returns:
SQLite strftime format string for the granularity
"""
if granularity is None:
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
if granularity.endswith("d"):
return "%Y-%m-%d 00:00:00"
elif granularity.endswith("h"):
return "%Y-%m-%d %H:00:00"
elif granularity.endswith("m"):
return "%Y-%m-%d %H:%M:00"
else:
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
async def query_traces( async def query_traces(
self, self,
attribute_filters: list[QueryCondition] | None = None, attribute_filters: list[QueryCondition] | None = None,