diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index e6073c429..845e23e79 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -233,6 +233,37 @@ class MetricsMixin(BaseModel): metrics: Optional[List[Metric]] = None +@json_schema_type +class MetricQueryType(Enum): + RANGE = "range" # Returns data points over time range + INSTANT = "instant" # Returns single data point + + +@json_schema_type +class MetricLabelMatcher(BaseModel): + name: str + value: str + operator: Literal["=", "!=", "=~", "!~"] = "=" # Prometheus-style operators + + +@json_schema_type +class MetricDataPoint(BaseModel): + timestamp: datetime + value: float + + +@json_schema_type +class MetricSeries(BaseModel): + metric: str + labels: Dict[str, str] + values: List[MetricDataPoint] + + +@json_schema_type +class GetMetricsResponse(BaseModel): + data: List[MetricSeries] + + @runtime_checkable class Telemetry(Protocol): @webmethod(route="/telemetry/events", method="POST") @@ -277,3 +308,14 @@ class Telemetry(Protocol): dataset_id: str, max_depth: Optional[int] = None, ) -> None: ... + + @webmethod(route="/telemetry/metrics/{metric_name}", method="POST") + async def get_metrics( + self, + metric_name: str, + start_time: int, # Unix timestamp in seconds + end_time: Optional[int] = None, # Unix timestamp in seconds + step: Optional[str] = "1m", # Prometheus-style duration: 1m, 5m, 1h, etc. + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: Optional[List[MetricLabelMatcher]] = None, + ) -> GetMetricsResponse: ... diff --git a/llama_stack/providers/inline/telemetry/meta_reference/config.py b/llama_stack/providers/inline/telemetry/meta_reference/config.py index 97b1c2efc..a66f5e492 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/config.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/config.py @@ -23,6 +23,14 @@ class TelemetryConfig(BaseModel): default="http://localhost:4318", description="The OpenTelemetry collector endpoint URL", ) + prometheus_endpoint: str = Field( + default="http://localhost:9090", + description="The Prometheus endpoint URL", + ) + prometheus_disable_ssl: bool = Field( + default=True, + description="Whether to disable SSL for the Prometheus endpoint", + ) service_name: str = Field( default="llama-stack", description="The service name to use for telemetry", diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index ef417ac18..d35cc4409 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -5,6 +5,7 @@ # the root directory of this source tree. import threading +from datetime import datetime from typing import Any, Dict, List, Optional from urllib.parse import urljoin @@ -17,10 +18,16 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.semconv.resource import ResourceAttributes +from prometheus_api_client import PrometheusConnect from llama_stack.apis.telemetry import ( Event, + GetMetricsResponse, + MetricDataPoint, MetricEvent, + MetricLabelMatcher, + MetricQueryType, + MetricSeries, QueryCondition, QuerySpanTreeResponse, QueryTracesResponse, @@ -111,6 +118,9 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): if TelemetrySink.OTEL in self.config.sinks: self.meter = metrics.get_meter(__name__) + self.prom = PrometheusConnect( + url=self.config.prometheus_endpoint, disable_ssl=self.config.prometheus_disable_ssl + ) if TelemetrySink.SQLITE in self.config.sinks: self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) @@ -248,3 +258,49 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): max_depth=max_depth, ) ) + + async def get_metrics( + self, + metric_name: str, + start_time: datetime, + end_time: Optional[datetime] = None, + step: Optional[str] = "15s", + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: Optional[List[MetricLabelMatcher]] = None, + ) -> GetMetricsResponse: + if TelemetrySink.OTEL not in self.config.sinks: + return GetMetricsResponse(data=[]) + + try: + # Build query with label matchers if provided + query = metric_name + if label_matchers: + matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers] + query = f"{metric_name}{{{','.join(matchers)}}}" + + # Use instant query for current values, range query for historical data + if query_type == MetricQueryType.INSTANT: + result = self.prom.custom_query(query=query) + # Convert instant query results to same format as range query + result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result] + else: + result = self.prom.custom_query_range( + query=query, + start_time=start_time, + end_time=end_time if end_time else None, + step=step, + ) + + series = [] + for metric_data in result: + values = [ + MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1])) + for point in metric_data["values"] + ] + series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values)) + + return GetMetricsResponse(data=series) + + except Exception as e: + print(f"Error querying metrics: {e}") + return GetMetricsResponse(data=[]) diff --git a/llama_stack/providers/registry/telemetry.py b/llama_stack/providers/registry/telemetry.py index f3b41374c..09bbf87e2 100644 --- a/llama_stack/providers/registry/telemetry.py +++ b/llama_stack/providers/registry/telemetry.py @@ -23,6 +23,7 @@ def available_providers() -> List[ProviderSpec]: pip_packages=[ "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http", + "prometheus-api-client", ], optional_api_dependencies=[Api.datasetio], module="llama_stack.providers.inline.telemetry.meta_reference",