add API to query metrics

This commit is contained in:
Dinesh Yeduguru 2025-02-05 10:06:59 -08:00
parent b180069def
commit cce217bab8
4 changed files with 107 additions and 0 deletions

View file

@ -233,6 +233,37 @@ class MetricsMixin(BaseModel):
metrics: Optional[List[Metric]] = None
@json_schema_type
class MetricQueryType(Enum):
RANGE = "range" # Returns data points over time range
INSTANT = "instant" # Returns single data point
@json_schema_type
class MetricLabelMatcher(BaseModel):
name: str
value: str
operator: Literal["=", "!=", "=~", "!~"] = "=" # Prometheus-style operators
@json_schema_type
class MetricDataPoint(BaseModel):
timestamp: datetime
value: float
@json_schema_type
class MetricSeries(BaseModel):
metric: str
labels: Dict[str, str]
values: List[MetricDataPoint]
@json_schema_type
class GetMetricsResponse(BaseModel):
data: List[MetricSeries]
@runtime_checkable
class Telemetry(Protocol):
@webmethod(route="/telemetry/events", method="POST")
@ -277,3 +308,14 @@ class Telemetry(Protocol):
dataset_id: str,
max_depth: Optional[int] = None,
) -> None: ...
@webmethod(route="/telemetry/metrics/{metric_name}", method="POST")
async def get_metrics(
self,
metric_name: str,
start_time: int, # Unix timestamp in seconds
end_time: Optional[int] = None, # Unix timestamp in seconds
step: Optional[str] = "1m", # Prometheus-style duration: 1m, 5m, 1h, etc.
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: Optional[List[MetricLabelMatcher]] = None,
) -> GetMetricsResponse: ...

View file

@ -23,6 +23,14 @@ class TelemetryConfig(BaseModel):
default="http://localhost:4318",
description="The OpenTelemetry collector endpoint URL",
)
prometheus_endpoint: str = Field(
default="http://localhost:9090",
description="The Prometheus endpoint URL",
)
prometheus_disable_ssl: bool = Field(
default=True,
description="Whether to disable SSL for the Prometheus endpoint",
)
service_name: str = Field(
default="llama-stack",
description="The service name to use for telemetry",

View file

@ -5,6 +5,7 @@
# the root directory of this source tree.
import threading
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
@ -17,10 +18,16 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes
from prometheus_api_client import PrometheusConnect
from llama_stack.apis.telemetry import (
Event,
GetMetricsResponse,
MetricDataPoint,
MetricEvent,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
QueryCondition,
QuerySpanTreeResponse,
QueryTracesResponse,
@ -111,6 +118,9 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
if TelemetrySink.OTEL in self.config.sinks:
self.meter = metrics.get_meter(__name__)
self.prom = PrometheusConnect(
url=self.config.prometheus_endpoint, disable_ssl=self.config.prometheus_disable_ssl
)
if TelemetrySink.SQLITE in self.config.sinks:
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
@ -248,3 +258,49 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
max_depth=max_depth,
)
)
async def get_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: Optional[datetime] = None,
step: Optional[str] = "15s",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: Optional[List[MetricLabelMatcher]] = None,
) -> GetMetricsResponse:
if TelemetrySink.OTEL not in self.config.sinks:
return GetMetricsResponse(data=[])
try:
# Build query with label matchers if provided
query = metric_name
if label_matchers:
matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers]
query = f"{metric_name}{{{','.join(matchers)}}}"
# Use instant query for current values, range query for historical data
if query_type == MetricQueryType.INSTANT:
result = self.prom.custom_query(query=query)
# Convert instant query results to same format as range query
result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result]
else:
result = self.prom.custom_query_range(
query=query,
start_time=start_time,
end_time=end_time if end_time else None,
step=step,
)
series = []
for metric_data in result:
values = [
MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1]))
for point in metric_data["values"]
]
series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values))
return GetMetricsResponse(data=series)
except Exception as e:
print(f"Error querying metrics: {e}")
return GetMetricsResponse(data=[])

View file

@ -23,6 +23,7 @@ def available_providers() -> List[ProviderSpec]:
pip_packages=[
"opentelemetry-sdk",
"opentelemetry-exporter-otlp-proto-http",
"prometheus-api-client",
],
optional_api_dependencies=[Api.datasetio],
module="llama_stack.providers.inline.telemetry.meta_reference",