mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 04:04:14 +00:00
feat: implement query_metrics (#3074)
# What does this PR do? query_metrics currently has no implementation, meaning once a metric is emitted there is no way in llama stack to query it from the store. implement query_metrics for the meta_reference provider which follows a similar style to `query_traces`, using the trace_store to format an SQL query and execute it in this case the parameters for the query are `metric.METRIC_NAME, start_time, and end_time` and any other matchers if they are provided. this required client side changes since the client had no `query_metrics` or any associated resources, so any tests here will fail but I will provide manual execution logs for the new tests I am adding order the metrics by timestamp. Additionally add `unit` to the `MetricDataPoint` class since this adds much more context to the metric being queried. depends on https://github.com/llamastack/llama-stack-client-python/pull/260 ## Test Plan ``` import time import uuid def create_http_client(): from llama_stack_client import LlamaStackClient return LlamaStackClient(base_url="http://localhost:8321") client = create_http_client() response = client.telemetry.query_metrics(metric_name="total_tokens", start_time=0) print(response) ``` ``` ╰─ python3.12 ~/telemetry.py INFO:httpx:HTTP Request: POST http://localhost:8322/v1/telemetry/metrics/total_tokens "HTTP/1.1 200 OK" [TelemetryQueryMetricsResponse(data=None, metric='total_tokens', labels=[], values=[{'timestamp': 1753999514, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1753999816, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1753999881, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1753999956, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1754000200, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1754000419, 'value': 36.0, 'unit': 'tokens'}, {'timestamp': 1754000714, 'value': 36.0, 'unit': 'tokens'}, {'timestamp': 1754000876, 'value': 36.0, 'unit': 'tokens'}, {'timestamp': 1754000908, 'value': 34.0, 'unit': 'tokens'}, {'timestamp': 1754001309, 'value': 584.0, 'unit': 'tokens'}, {'timestamp': 1754001311, 'value': 138.0, 'unit': 'tokens'}, {'timestamp': 1754001316, 'value': 349.0, 'unit': 'tokens'}, {'timestamp': 1754001318, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001320, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001341, 'value': 923.0, 'unit': 'tokens'}, {'timestamp': 1754001350, 'value': 354.0, 'unit': 'tokens'}, {'timestamp': 1754001462, 'value': 417.0, 'unit': 'tokens'}, {'timestamp': 1754001464, 'value': 158.0, 'unit': 'tokens'}, {'timestamp': 1754001475, 'value': 697.0, 'unit': 'tokens'}, {'timestamp': 1754001477, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001479, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001489, 'value': 298.0, 'unit': 'tokens'}, {'timestamp': 1754001541, 'value': 615.0, 'unit': 'tokens'}, {'timestamp': 1754001543, 'value': 119.0, 'unit': 'tokens'}, {'timestamp': 1754001548, 'value': 310.0, 'unit': 'tokens'}, {'timestamp': 1754001549, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001551, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001568, 'value': 714.0, 'unit': 'tokens'}, {'timestamp': 1754001800, 'value': 437.0, 'unit': 'tokens'}, {'timestamp': 1754001802, 'value': 200.0, 'unit': 'tokens'}, {'timestamp': 1754001806, 'value': 262.0, 'unit': 'tokens'}, {'timestamp': 1754001808, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001810, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001816, 'value': 82.0, 'unit': 'tokens'}, {'timestamp': 1754001923, 'value': 61.0, 'unit': 'tokens'}, {'timestamp': 1754001929, 'value': 391.0, 'unit': 'tokens'}, {'timestamp': 1754001939, 'value': 598.0, 'unit': 'tokens'}, {'timestamp': 1754001941, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001942, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754001952, 'value': 252.0, 'unit': 'tokens'}, {'timestamp': 1754002053, 'value': 251.0, 'unit': 'tokens'}, {'timestamp': 1754002059, 'value': 375.0, 'unit': 'tokens'}, {'timestamp': 1754002062, 'value': 244.0, 'unit': 'tokens'}, {'timestamp': 1754002064, 'value': 111.0, 'unit': 'tokens'}, {'timestamp': 1754002065, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754002083, 'value': 719.0, 'unit': 'tokens'}, {'timestamp': 1754002302, 'value': 279.0, 'unit': 'tokens'}, {'timestamp': 1754002306, 'value': 218.0, 'unit': 'tokens'}, {'timestamp': 1754002308, 'value': 198.0, 'unit': 'tokens'}, {'timestamp': 1754002309, 'value': 69.0, 'unit': 'tokens'}, {'timestamp': 1754002311, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754002324, 'value': 481.0, 'unit': 'tokens'}, {'timestamp': 1754003161, 'value': 579.0, 'unit': 'tokens'}, {'timestamp': 1754003161, 'value': 69.0, 'unit': 'tokens'}, {'timestamp': 1754003169, 'value': 499.0, 'unit': 'tokens'}, {'timestamp': 1754003171, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754003173, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754003185, 'value': 422.0, 'unit': 'tokens'}, {'timestamp': 1754003448, 'value': 579.0, 'unit': 'tokens'}, {'timestamp': 1754003453, 'value': 422.0, 'unit': 'tokens'}, {'timestamp': 1754003589, 'value': 579.0, 'unit': 'tokens'}, {'timestamp': 1754003609, 'value': 279.0, 'unit': 'tokens'}, {'timestamp': 1754003614, 'value': 481.0, 'unit': 'tokens'}, {'timestamp': 1754003706, 'value': 303.0, 'unit': 'tokens'}, {'timestamp': 1754003706, 'value': 51.0, 'unit': 'tokens'}, {'timestamp': 1754003713, 'value': 426.0, 'unit': 'tokens'}, {'timestamp': 1754003714, 'value': 70.0, 'unit': 'tokens'}, {'timestamp': 1754003715, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754003724, 'value': 225.0, 'unit': 'tokens'}, {'timestamp': 1754004226, 'value': 516.0, 'unit': 'tokens'}, {'timestamp': 1754004228, 'value': 127.0, 'unit': 'tokens'}, {'timestamp': 1754004232, 'value': 281.0, 'unit': 'tokens'}, {'timestamp': 1754004234, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754004236, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754004244, 'value': 206.0, 'unit': 'tokens'}, {'timestamp': 1754004683, 'value': 338.0, 'unit': 'tokens'}, {'timestamp': 1754004690, 'value': 481.0, 'unit': 'tokens'}, {'timestamp': 1754004692, 'value': 124.0, 'unit': 'tokens'}, {'timestamp': 1754004692, 'value': 65.0, 'unit': 'tokens'}, {'timestamp': 1754004694, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754004703, 'value': 211.0, 'unit': 'tokens'}, {'timestamp': 1754004743, 'value': 338.0, 'unit': 'tokens'}, {'timestamp': 1754004749, 'value': 211.0, 'unit': 'tokens'}, {'timestamp': 1754005566, 'value': 481.0, 'unit': 'tokens'}, {'timestamp': 1754006101, 'value': 159.0, 'unit': 'tokens'}, {'timestamp': 1754006105, 'value': 272.0, 'unit': 'tokens'}, {'timestamp': 1754006109, 'value': 308.0, 'unit': 'tokens'}, {'timestamp': 1754006110, 'value': 61.0, 'unit': 'tokens'}, {'timestamp': 1754006112, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754006130, 'value': 705.0, 'unit': 'tokens'}, {'timestamp': 1754051825, 'value': 454.0, 'unit': 'tokens'}, {'timestamp': 1754051827, 'value': 152.0, 'unit': 'tokens'}, {'timestamp': 1754051834, 'value': 481.0, 'unit': 'tokens'}, {'timestamp': 1754051835, 'value': 55.0, 'unit': 'tokens'}, {'timestamp': 1754051837, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754051845, 'value': 102.0, 'unit': 'tokens'}, {'timestamp': 1754099929, 'value': 36.0, 'unit': 'tokens'}, {'timestamp': 1754510050, 'value': 598.0, 'unit': 'tokens'}, {'timestamp': 1754510052, 'value': 160.0, 'unit': 'tokens'}, {'timestamp': 1754510064, 'value': 725.0, 'unit': 'tokens'}, {'timestamp': 1754510065, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754510067, 'value': 133.0, 'unit': 'tokens'}, {'timestamp': 1754510083, 'value': 535.0, 'unit': 'tokens'}, {'timestamp': 1754596582, 'value': 36.0, 'unit': 'tokens'}])] ``` adding tests for each currently documented metric in llama stack using this new function. attached is also some manual testing integrations tests passing locally with replay mode and the linked client changes: <img width="1907" height="529" alt="Screenshot 2025-08-08 at 2 49 14 PM" src="https://github.com/user-attachments/assets/d482ab06-dcff-4f0c-a1f1-f870670ee9bc" /> --------- Signed-off-by: Charlie Doern <cdoern@redhat.com>
This commit is contained in:
parent
3d119a86d4
commit
3b9278f254
17 changed files with 921 additions and 6 deletions
|
@ -386,6 +386,7 @@ class MetricDataPoint(BaseModel):
|
|||
|
||||
timestamp: int
|
||||
value: float
|
||||
unit: str
|
||||
|
||||
|
||||
@json_schema_type
|
||||
|
@ -518,7 +519,7 @@ class Telemetry(Protocol):
|
|||
metric_name: str,
|
||||
start_time: int,
|
||||
end_time: int | None = None,
|
||||
granularity: str | None = "1d",
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import datetime
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
|
@ -145,11 +146,41 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
|||
metric_name: str,
|
||||
start_time: int,
|
||||
end_time: int | None = None,
|
||||
granularity: str | None = "1d",
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
raise NotImplementedError("Querying metrics is not implemented")
|
||||
"""Query metrics from the telemetry store.
|
||||
|
||||
Args:
|
||||
metric_name: The name of the metric to query (e.g., "prompt_tokens")
|
||||
start_time: Start time as Unix timestamp
|
||||
end_time: End time as Unix timestamp (defaults to now if None)
|
||||
granularity: Time granularity for aggregation
|
||||
query_type: Type of query (RANGE or INSTANT)
|
||||
label_matchers: Label filters to apply
|
||||
|
||||
Returns:
|
||||
QueryMetricsResponse with metric time series data
|
||||
"""
|
||||
# Convert timestamps to datetime objects
|
||||
start_dt = datetime.datetime.fromtimestamp(start_time, datetime.UTC)
|
||||
end_dt = datetime.datetime.fromtimestamp(end_time, datetime.UTC) if end_time else None
|
||||
|
||||
# Use SQLite trace store if available
|
||||
if hasattr(self, "trace_store") and self.trace_store:
|
||||
return await self.trace_store.query_metrics(
|
||||
metric_name=metric_name,
|
||||
start_time=start_dt,
|
||||
end_time=end_dt,
|
||||
granularity=granularity,
|
||||
query_type=query_type,
|
||||
label_matchers=label_matchers,
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"In order to query_metrics, you must have {TelemetrySink.SQLITE} set in your telemetry sinks"
|
||||
)
|
||||
|
||||
def _log_unstructured(self, event: UnstructuredLogEvent, ttl_seconds: int) -> None:
|
||||
with self._lock:
|
||||
|
|
|
@ -5,12 +5,23 @@
|
|||
# the root directory of this source tree.
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import UTC, datetime
|
||||
from typing import Protocol
|
||||
|
||||
import aiosqlite
|
||||
|
||||
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace
|
||||
from llama_stack.apis.telemetry import (
|
||||
MetricDataPoint,
|
||||
MetricLabel,
|
||||
MetricLabelMatcher,
|
||||
MetricQueryType,
|
||||
MetricSeries,
|
||||
QueryCondition,
|
||||
QueryMetricsResponse,
|
||||
Span,
|
||||
SpanWithStatus,
|
||||
Trace,
|
||||
)
|
||||
|
||||
|
||||
class TraceStore(Protocol):
|
||||
|
@ -29,11 +40,192 @@ class TraceStore(Protocol):
|
|||
max_depth: int | None = None,
|
||||
) -> dict[str, SpanWithStatus]: ...
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: datetime,
|
||||
end_time: datetime | None = None,
|
||||
granularity: str | None = "1d",
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse: ...
|
||||
|
||||
|
||||
class SQLiteTraceStore(TraceStore):
|
||||
def __init__(self, conn_string: str):
|
||||
self.conn_string = conn_string
|
||||
|
||||
async def query_metrics(
|
||||
self,
|
||||
metric_name: str,
|
||||
start_time: datetime,
|
||||
end_time: datetime | None = None,
|
||||
granularity: str | None = None,
|
||||
query_type: MetricQueryType = MetricQueryType.RANGE,
|
||||
label_matchers: list[MetricLabelMatcher] | None = None,
|
||||
) -> QueryMetricsResponse:
|
||||
if end_time is None:
|
||||
end_time = datetime.now(UTC)
|
||||
|
||||
# Build base query
|
||||
if query_type == MetricQueryType.INSTANT:
|
||||
query = """
|
||||
SELECT
|
||||
se.name,
|
||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
else:
|
||||
if granularity:
|
||||
time_format = self._get_time_format_for_granularity(granularity)
|
||||
query = f"""
|
||||
SELECT
|
||||
se.name,
|
||||
SUM(CAST(json_extract(se.attributes, '$.value') AS REAL)) as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes,
|
||||
strftime('{time_format}', se.timestamp) as bucket_start
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
else:
|
||||
query = """
|
||||
SELECT
|
||||
se.name,
|
||||
json_extract(se.attributes, '$.value') as value,
|
||||
json_extract(se.attributes, '$.unit') as unit,
|
||||
se.attributes,
|
||||
se.timestamp
|
||||
FROM span_events se
|
||||
WHERE se.name = ?
|
||||
AND se.timestamp BETWEEN ? AND ?
|
||||
"""
|
||||
|
||||
params = [f"metric.{metric_name}", start_time.isoformat(), end_time.isoformat()]
|
||||
|
||||
# Labels that will be attached to the MetricSeries (preserve matcher labels)
|
||||
all_labels: list[MetricLabel] = []
|
||||
matcher_label_names = set()
|
||||
if label_matchers:
|
||||
for matcher in label_matchers:
|
||||
json_path = f"$.{matcher.name}"
|
||||
if matcher.operator == "=":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') = ?"
|
||||
params.append(matcher.value)
|
||||
elif matcher.operator == "!=":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') != ?"
|
||||
params.append(matcher.value)
|
||||
elif matcher.operator == "=~":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') LIKE ?"
|
||||
params.append(f"%{matcher.value}%")
|
||||
elif matcher.operator == "!~":
|
||||
query += f" AND json_extract(se.attributes, '{json_path}') NOT LIKE ?"
|
||||
params.append(f"%{matcher.value}%")
|
||||
# Preserve filter context in output
|
||||
all_labels.append(MetricLabel(name=matcher.name, value=str(matcher.value)))
|
||||
matcher_label_names.add(matcher.name)
|
||||
|
||||
# GROUP BY / ORDER BY logic
|
||||
if query_type == MetricQueryType.RANGE and granularity:
|
||||
group_time_format = self._get_time_format_for_granularity(granularity)
|
||||
query += f" GROUP BY strftime('{group_time_format}', se.timestamp), json_extract(se.attributes, '$.unit')"
|
||||
query += " ORDER BY bucket_start"
|
||||
elif query_type == MetricQueryType.INSTANT:
|
||||
query += " GROUP BY json_extract(se.attributes, '$.unit')"
|
||||
else:
|
||||
query += " ORDER BY se.timestamp"
|
||||
|
||||
# Execute query
|
||||
async with aiosqlite.connect(self.conn_string) as conn:
|
||||
conn.row_factory = aiosqlite.Row
|
||||
async with conn.execute(query, params) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
return QueryMetricsResponse(data=[])
|
||||
|
||||
data_points = []
|
||||
# We want to add attribute labels, but only those not already present as matcher labels.
|
||||
attr_label_names = set()
|
||||
for row in rows:
|
||||
# Parse JSON attributes safely, if there are no attributes (weird), just don't add the labels to the result.
|
||||
try:
|
||||
attributes = json.loads(row["attributes"] or "{}")
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
attributes = {}
|
||||
|
||||
value = row["value"]
|
||||
unit = row["unit"] or ""
|
||||
|
||||
# Add labels from attributes without duplicating matcher labels, if we don't do this, there will be a lot of duplicate label in the result.
|
||||
for k, v in attributes.items():
|
||||
if k not in ["value", "unit"] and k not in matcher_label_names and k not in attr_label_names:
|
||||
all_labels.append(MetricLabel(name=k, value=str(v)))
|
||||
attr_label_names.add(k)
|
||||
|
||||
# Determine timestamp
|
||||
if query_type == MetricQueryType.RANGE and granularity:
|
||||
try:
|
||||
bucket_start_raw = row["bucket_start"]
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
"DB did not have a bucket_start time in row when using granularity, this indicates improper formatting"
|
||||
) from e
|
||||
# this value could also be there, but be NULL, I think.
|
||||
if bucket_start_raw is None:
|
||||
raise ValueError("bucket_start is None check time format and data")
|
||||
bucket_start = datetime.fromisoformat(bucket_start_raw)
|
||||
timestamp = int(bucket_start.timestamp())
|
||||
elif query_type == MetricQueryType.INSTANT:
|
||||
timestamp = int(datetime.now(UTC).timestamp())
|
||||
else:
|
||||
try:
|
||||
timestamp_raw = row["timestamp"]
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
"DB did not have a timestamp in row, this indicates improper formatting"
|
||||
) from e
|
||||
# this value could also be there, but be NULL, I think.
|
||||
if timestamp_raw is None:
|
||||
raise ValueError("timestamp is None check time format and data")
|
||||
timestamp_iso = datetime.fromisoformat(timestamp_raw)
|
||||
timestamp = int(timestamp_iso.timestamp())
|
||||
|
||||
data_points.append(
|
||||
MetricDataPoint(
|
||||
timestamp=timestamp,
|
||||
value=value,
|
||||
unit=unit,
|
||||
)
|
||||
)
|
||||
|
||||
metric_series = [MetricSeries(metric=metric_name, labels=all_labels, values=data_points)]
|
||||
return QueryMetricsResponse(data=metric_series)
|
||||
|
||||
def _get_time_format_for_granularity(self, granularity: str | None) -> str:
|
||||
"""Get the SQLite strftime format string for a given granularity.
|
||||
Args:
|
||||
granularity: Granularity string (e.g., "1m", "5m", "1h", "1d")
|
||||
Returns:
|
||||
SQLite strftime format string for the granularity
|
||||
"""
|
||||
if granularity is None:
|
||||
raise ValueError("granularity cannot be None for this method - use separate logic for no aggregation")
|
||||
|
||||
if granularity.endswith("d"):
|
||||
return "%Y-%m-%d 00:00:00"
|
||||
elif granularity.endswith("h"):
|
||||
return "%Y-%m-%d %H:00:00"
|
||||
elif granularity.endswith("m"):
|
||||
return "%Y-%m-%d %H:%M:00"
|
||||
else:
|
||||
return "%Y-%m-%d %H:%M:00" # Default to most granular which will give us the most timestamps.
|
||||
|
||||
async def query_traces(
|
||||
self,
|
||||
attribute_filters: list[QueryCondition] | None = None,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue