From 026466228d0b3803afb919f587aea7c50a4e0e9a Mon Sep 17 00:00:00 2001 From: Dinesh Yeduguru Date: Thu, 30 Jan 2025 22:02:21 -0500 Subject: [PATCH] add metrics query to telemtry --- llama_stack/apis/telemetry/telemetry.py | 37 +++++++++ .../meta_reference/sqlite_span_processor.py | 20 +++++ .../telemetry/meta_reference/telemetry.py | 54 +++++++++++-- .../utils/telemetry/sqlite_trace_store.py | 76 ++++++++++++++++++- 4 files changed, 179 insertions(+), 8 deletions(-) diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index 284e3a970..25da80630 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -187,6 +187,33 @@ class QuerySpanTreeResponse(BaseModel): data: Dict[str, SpanWithStatus] +@json_schema_type +class MetricType(Enum): + COUNTER = "counter" + GAUGE = "gauge" + HISTOGRAM = "histogram" + + +@json_schema_type +class MetricDefinition(BaseModel): + name: str + type: MetricType + description: str + unit: str + + +@json_schema_type +class MetricValue(BaseModel): + name: str + value: Union[int, float] + timestamp: datetime + attributes: Optional[Dict[str, Any]] = Field(default_factory=dict) + + +class QueryMetricsResponse(BaseModel): + data: List[MetricValue] + + @runtime_checkable class Telemetry(Protocol): @webmethod(route="/telemetry/events", method="POST") @@ -233,3 +260,13 @@ class Telemetry(Protocol): dataset_id: str, max_depth: Optional[int] = None, ) -> None: ... + + @webmethod(route="/telemetry/metrics", method="GET") + async def query_metrics( + self, + metric_names: Optional[List[str]] = None, + attribute_filters: Optional[List[QueryCondition]] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: Optional[int] = 100, + ) -> QueryMetricsResponse: ... diff --git a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py index 3455c2236..25674cb67 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/sqlite_span_processor.py @@ -75,6 +75,19 @@ class SQLiteSpanProcessor(SpanProcessor): """ ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + value REAL NOT NULL, + timestamp TIMESTAMP NOT NULL, + attributes TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_traces_created_at @@ -82,6 +95,13 @@ class SQLiteSpanProcessor(SpanProcessor): """ ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_metrics_name_timestamp + ON metrics(name, timestamp) + """ + ) + conn.commit() cursor.close() diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 569d02f50..a070e6c57 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -4,7 +4,9 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +import json import threading +from datetime import datetime from typing import Any, Dict, List, Optional from opentelemetry import metrics, trace @@ -21,6 +23,7 @@ from llama_stack.apis.telemetry import ( Event, MetricEvent, QueryCondition, + QueryMetricsResponse, QuerySpanTreeResponse, QueryTracesResponse, Span, @@ -177,14 +180,35 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): return _GLOBAL_STORAGE["gauges"][name] def _log_metric(self, event: MetricEvent) -> None: - if isinstance(event.value, int): - counter = self._get_or_create_counter(event.metric, event.unit) - counter.add(event.value, attributes=event.attributes) - elif isinstance(event.value, float): - up_down_counter = self._get_or_create_up_down_counter( - event.metric, event.unit + # Store in SQLite + if TelemetrySink.SQLITE in self.config.sinks: + conn = self._get_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO metrics ( + name, value, timestamp, attributes + ) VALUES (?, ?, ?, ?) + """, + ( + event.metric, + event.value, + datetime.fromtimestamp(event.timestamp).isoformat(), + json.dumps(event.attributes), + ), ) - up_down_counter.add(event.value, attributes=event.attributes) + conn.commit() + + # Export to OTEL if configured + if TelemetrySink.OTEL in self.config.sinks: + if isinstance(event.value, int): + counter = self._get_or_create_counter(event.metric, event.unit) + counter.add(event.value, attributes=event.attributes) + elif isinstance(event.value, float): + up_down_counter = self._get_or_create_up_down_counter( + event.metric, event.unit + ) + up_down_counter.add(event.value, attributes=event.attributes) def _get_or_create_up_down_counter( self, name: str, unit: str @@ -281,3 +305,19 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): max_depth=max_depth, ) ) + + async def query_metrics( + self, + metric_names: Optional[List[str]] = None, + attribute_filters: Optional[List[QueryCondition]] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: Optional[int] = 100, + ) -> QueryMetricsResponse: + return await self.trace_store.query_metrics( + metric_names=metric_names, + attribute_filters=attribute_filters, + start_time=start_time, + end_time=end_time, + limit=limit, + ) diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py index a2821da43..b563b67a6 100644 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py @@ -10,7 +10,13 @@ from typing import Dict, List, Optional, Protocol import aiosqlite -from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace +from llama_stack.apis.telemetry import ( + MetricValue, + QueryCondition, + Span, + SpanWithStatus, + Trace, +) class TraceStore(Protocol): @@ -29,6 +35,15 @@ class TraceStore(Protocol): max_depth: Optional[int] = None, ) -> Dict[str, SpanWithStatus]: ... + async def query_metrics( + self, + metric_names: Optional[List[str]] = None, + attribute_filters: Optional[List[QueryCondition]] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: Optional[int] = 100, + ) -> List[MetricValue]: ... + class SQLiteTraceStore(TraceStore): def __init__(self, conn_string: str): @@ -187,3 +202,62 @@ class SQLiteTraceStore(TraceStore): if row is None: raise ValueError(f"Span {span_id} not found") return Span(**row) + + async def query_metrics( + self, + metric_names: Optional[List[str]] = None, + attribute_filters: Optional[List[QueryCondition]] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: Optional[int] = 100, + ) -> List[MetricValue]: + conditions = [] + params = [] + + # Build WHERE clause + if metric_names: + conditions.append(f"name IN ({','.join('?' * len(metric_names))})") + params.extend(metric_names) + + if start_time: + conditions.append("timestamp >= ?") + params.append(start_time.isoformat()) + + if end_time: + conditions.append("timestamp <= ?") + params.append(end_time.isoformat()) + + if attribute_filters: + for condition in attribute_filters: + ops_map = {"eq": "=", "ne": "!=", "gt": ">", "lt": "<"} + conditions.append( + f"json_extract(attributes, '$.{condition.key}') {ops_map[condition.op.value]} ?" + ) + params.append(condition.value) + + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + + query = f""" + SELECT name, value, timestamp, attributes + FROM metrics + {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """ + params.append(limit) + + async with aiosqlite.connect(self.conn_string) as conn: + conn.row_factory = aiosqlite.Row + async with conn.execute(query, params) as cursor: + rows = await cursor.fetchall() + return [ + MetricValue( + name=row["name"], + value=row["value"], + timestamp=datetime.fromisoformat(row["timestamp"]), + attributes=( + json.loads(row["attributes"]) if row["attributes"] else {} + ), + ) + for row in rows + ]