add metrics query to telemtry

This commit is contained in:
Dinesh Yeduguru 2025-01-30 22:02:21 -05:00
parent 6609362d26
commit 026466228d
4 changed files with 179 additions and 8 deletions

View file

@ -187,6 +187,33 @@ class QuerySpanTreeResponse(BaseModel):
data: Dict[str, SpanWithStatus] data: Dict[str, SpanWithStatus]
@json_schema_type
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@json_schema_type
class MetricDefinition(BaseModel):
name: str
type: MetricType
description: str
unit: str
@json_schema_type
class MetricValue(BaseModel):
name: str
value: Union[int, float]
timestamp: datetime
attributes: Optional[Dict[str, Any]] = Field(default_factory=dict)
class QueryMetricsResponse(BaseModel):
data: List[MetricValue]
@runtime_checkable @runtime_checkable
class Telemetry(Protocol): class Telemetry(Protocol):
@webmethod(route="/telemetry/events", method="POST") @webmethod(route="/telemetry/events", method="POST")
@ -233,3 +260,13 @@ class Telemetry(Protocol):
dataset_id: str, dataset_id: str,
max_depth: Optional[int] = None, max_depth: Optional[int] = None,
) -> None: ... ) -> None: ...
@webmethod(route="/telemetry/metrics", method="GET")
async def query_metrics(
self,
metric_names: Optional[List[str]] = None,
attribute_filters: Optional[List[QueryCondition]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = 100,
) -> QueryMetricsResponse: ...

View file

@ -75,6 +75,19 @@ class SQLiteSpanProcessor(SpanProcessor):
""" """
) )
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
value REAL NOT NULL,
timestamp TIMESTAMP NOT NULL,
attributes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
cursor.execute( cursor.execute(
""" """
CREATE INDEX IF NOT EXISTS idx_traces_created_at CREATE INDEX IF NOT EXISTS idx_traces_created_at
@ -82,6 +95,13 @@ class SQLiteSpanProcessor(SpanProcessor):
""" """
) )
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_metrics_name_timestamp
ON metrics(name, timestamp)
"""
)
conn.commit() conn.commit()
cursor.close() cursor.close()

View file

@ -4,7 +4,9 @@
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import json
import threading import threading
from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from opentelemetry import metrics, trace from opentelemetry import metrics, trace
@ -21,6 +23,7 @@ from llama_stack.apis.telemetry import (
Event, Event,
MetricEvent, MetricEvent,
QueryCondition, QueryCondition,
QueryMetricsResponse,
QuerySpanTreeResponse, QuerySpanTreeResponse,
QueryTracesResponse, QueryTracesResponse,
Span, Span,
@ -177,14 +180,35 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
return _GLOBAL_STORAGE["gauges"][name] return _GLOBAL_STORAGE["gauges"][name]
def _log_metric(self, event: MetricEvent) -> None: def _log_metric(self, event: MetricEvent) -> None:
if isinstance(event.value, int): # Store in SQLite
counter = self._get_or_create_counter(event.metric, event.unit) if TelemetrySink.SQLITE in self.config.sinks:
counter.add(event.value, attributes=event.attributes) conn = self._get_connection()
elif isinstance(event.value, float): cursor = conn.cursor()
up_down_counter = self._get_or_create_up_down_counter( cursor.execute(
event.metric, event.unit """
INSERT INTO metrics (
name, value, timestamp, attributes
) VALUES (?, ?, ?, ?)
""",
(
event.metric,
event.value,
datetime.fromtimestamp(event.timestamp).isoformat(),
json.dumps(event.attributes),
),
) )
up_down_counter.add(event.value, attributes=event.attributes) conn.commit()
# Export to OTEL if configured
if TelemetrySink.OTEL in self.config.sinks:
if isinstance(event.value, int):
counter = self._get_or_create_counter(event.metric, event.unit)
counter.add(event.value, attributes=event.attributes)
elif isinstance(event.value, float):
up_down_counter = self._get_or_create_up_down_counter(
event.metric, event.unit
)
up_down_counter.add(event.value, attributes=event.attributes)
def _get_or_create_up_down_counter( def _get_or_create_up_down_counter(
self, name: str, unit: str self, name: str, unit: str
@ -281,3 +305,19 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
max_depth=max_depth, max_depth=max_depth,
) )
) )
async def query_metrics(
self,
metric_names: Optional[List[str]] = None,
attribute_filters: Optional[List[QueryCondition]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = 100,
) -> QueryMetricsResponse:
return await self.trace_store.query_metrics(
metric_names=metric_names,
attribute_filters=attribute_filters,
start_time=start_time,
end_time=end_time,
limit=limit,
)

View file

@ -10,7 +10,13 @@ from typing import Dict, List, Optional, Protocol
import aiosqlite import aiosqlite
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace from llama_stack.apis.telemetry import (
MetricValue,
QueryCondition,
Span,
SpanWithStatus,
Trace,
)
class TraceStore(Protocol): class TraceStore(Protocol):
@ -29,6 +35,15 @@ class TraceStore(Protocol):
max_depth: Optional[int] = None, max_depth: Optional[int] = None,
) -> Dict[str, SpanWithStatus]: ... ) -> Dict[str, SpanWithStatus]: ...
async def query_metrics(
self,
metric_names: Optional[List[str]] = None,
attribute_filters: Optional[List[QueryCondition]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = 100,
) -> List[MetricValue]: ...
class SQLiteTraceStore(TraceStore): class SQLiteTraceStore(TraceStore):
def __init__(self, conn_string: str): def __init__(self, conn_string: str):
@ -187,3 +202,62 @@ class SQLiteTraceStore(TraceStore):
if row is None: if row is None:
raise ValueError(f"Span {span_id} not found") raise ValueError(f"Span {span_id} not found")
return Span(**row) return Span(**row)
async def query_metrics(
self,
metric_names: Optional[List[str]] = None,
attribute_filters: Optional[List[QueryCondition]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: Optional[int] = 100,
) -> List[MetricValue]:
conditions = []
params = []
# Build WHERE clause
if metric_names:
conditions.append(f"name IN ({','.join('?' * len(metric_names))})")
params.extend(metric_names)
if start_time:
conditions.append("timestamp >= ?")
params.append(start_time.isoformat())
if end_time:
conditions.append("timestamp <= ?")
params.append(end_time.isoformat())
if attribute_filters:
for condition in attribute_filters:
ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"}
conditions.append(
f"json_extract(attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?"
)
params.append(condition.value)
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
query = f"""
SELECT name, value, timestamp, attributes
FROM metrics
{where_clause}
ORDER BY timestamp DESC
LIMIT ?
"""
params.append(limit)
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [
MetricValue(
name=row["name"],
value=row["value"],
timestamp=datetime.fromisoformat(row["timestamp"]),
attributes=(
json.loads(row["attributes"]) if row["attributes"] else {}
),
)
for row in rows
]