feat: implement query_metrics

query_metrics currently has no implementation, meaning once a metric is emitted there is no way in llama stack to query it from the store.

implement query_metrics for the meta_reference provider which follows a similar style to `query_traces`, using the trace_store to format an SQL query and execute it

in this case the parameters for the query are `metric.METRIC_NAME, start_time, and end_time`.

this required client side changes since the client had no `query_metrics` or any associated resources, so any tests here will fail but I will provider manual execution logs for the new tests I am adding

order the metrics by timestamp.

Additionally add `unit` to the `MetricDataPoint` class since this adds much more context to the metric being queried.

these metrics can also be aggregated via a `granularity` parameter. This was pre-defined as a string like: `1m, 1h, 1d` where metrics occuring in same timespan specified are aggregated together.

Signed-off-by: Charlie Doern <cdoern@redhat.com>
This commit is contained in:
Charlie Doern 2025-08-07 20:03:58 -04:00
parent 2ee898cc4c
commit fb553f3430
5 changed files with 237 additions and 6 deletions

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import datetime
import threading
from typing import Any
@ -145,11 +146,41 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
metric_name: str,
start_time: int,
end_time: int | None = None,
granularity: str | None = "1d",
granularity: str | None = None,
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: list[MetricLabelMatcher] | None = None,
) -> QueryMetricsResponse:
raise NotImplementedError("Querying metrics is not implemented")
"""Query metrics from the telemetry store.
Args:
metric_name: The name of the metric to query (e.g., "prompt_tokens")
start_time: Start time as Unix timestamp
end_time: End time as Unix timestamp (defaults to now if None)
granularity: Time granularity for aggregation
query_type: Type of query (RANGE or INSTANT)
label_matchers: Label filters to apply
Returns:
QueryMetricsResponse with metric time series data
"""
# Convert timestamps to datetime objects
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
# Use SQLite trace store if available
if hasattr(self, "trace_store") and self.trace_store:
return await self.trace_store.query_metrics(
metric_name=metric_name,
start_time=start_dt,
end_time=end_dt,
granularity=granularity,
query_type=query_type,
label_matchers=label_matchers,
)
else:
raise ValueError(
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
)
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
with self._lock: