diff --git a/llama_stack/providers/inline/telemetry/meta_reference/config.py b/llama_stack/providers/inline/telemetry/meta_reference/config.py index a66f5e492..297d88af7 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/config.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/config.py @@ -5,7 +5,7 @@ # the root directory of this source tree. from enum import Enum -from typing import Any, Dict, List +from typing import Annotated, Any, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field, field_validator @@ -18,19 +18,44 @@ class TelemetrySink(str, Enum): CONSOLE = "console" +class MetricsStoreType(str, Enum): + PROMETHEUS = "prometheus" + + +class PrometheusConfig(BaseModel): + endpoint: str = Field( + default="http://localhost:9090", + description="The Prometheus endpoint URL", + ) + disable_ssl: bool = Field( + default=True, + description="Whether to disable SSL for the Prometheus endpoint", + ) + + +class PrometheusMetricsStoreConfig(BaseModel): + type: Literal["prometheus"] = Field( + default="prometheus", + description="The type of metrics store to use", + ) + endpoint: str = Field( + default="http://localhost:9090", + description="The Prometheus endpoint URL", + ) + disable_ssl: bool = Field( + default=True, + description="Whether to disable SSL for the Prometheus endpoint", + ) + + +MetricsStoreConfig = Annotated[Union[PrometheusMetricsStoreConfig], Field(discriminator="type")] + + class TelemetryConfig(BaseModel): otel_endpoint: str = Field( default="http://localhost:4318", description="The OpenTelemetry collector endpoint URL", ) - prometheus_endpoint: str = Field( - default="http://localhost:9090", - description="The Prometheus endpoint URL", - ) - prometheus_disable_ssl: bool = Field( - default=True, - description="Whether to disable SSL for the Prometheus endpoint", - ) service_name: str = Field( default="llama-stack", description="The service name to use for telemetry", @@ -43,6 +68,7 @@ class TelemetryConfig(BaseModel): default=(RUNTIME_BASE_DIR / "trace_store.db").as_posix(), description="The path to the SQLite database to use for storing traces", ) + metrics_store_config: Optional[MetricsStoreConfig] = None @field_validator("sinks", mode="before") @classmethod diff --git a/llama_stack/providers/inline/telemetry/meta_reference/metrics_store.py b/llama_stack/providers/inline/telemetry/meta_reference/metrics_store.py new file mode 100644 index 000000000..0c17a422c --- /dev/null +++ b/llama_stack/providers/inline/telemetry/meta_reference/metrics_store.py @@ -0,0 +1,29 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Optional + +from llama_stack.apis.telemetry import ( + GetMetricsResponse, + MetricLabelMatcher, + MetricQueryType, +) + + +class MetricsStore(ABC): + @abstractmethod + async def get_metrics( + self, + metric_name: str, + start_time: datetime, + end_time: Optional[datetime] = None, + step: Optional[str] = "15s", + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: Optional[List[MetricLabelMatcher]] = None, + ) -> GetMetricsResponse: + pass diff --git a/llama_stack/providers/inline/telemetry/meta_reference/prometheus_metrics_store.py b/llama_stack/providers/inline/telemetry/meta_reference/prometheus_metrics_store.py new file mode 100644 index 000000000..14f6e6b80 --- /dev/null +++ b/llama_stack/providers/inline/telemetry/meta_reference/prometheus_metrics_store.py @@ -0,0 +1,65 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from datetime import datetime +from typing import List, Optional + +from prometheus_api_client import PrometheusConnect + +from llama_stack.apis.telemetry import ( + GetMetricsResponse, + MetricDataPoint, + MetricLabelMatcher, + MetricQueryType, + MetricSeries, +) + +from .metrics_store import MetricsStore + + +class PrometheusMetricsStore(MetricsStore): + def __init__(self, endpoint: str, disable_ssl: bool = True): + self.prom = PrometheusConnect(url=endpoint, disable_ssl=disable_ssl) + + async def get_metrics( + self, + metric_name: str, + start_time: datetime, + end_time: Optional[datetime] = None, + step: Optional[str] = "15s", + query_type: MetricQueryType = MetricQueryType.RANGE, + label_matchers: Optional[List[MetricLabelMatcher]] = None, + ) -> GetMetricsResponse: + try: + query = metric_name + if label_matchers: + matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers] + query = f"{metric_name}{{{','.join(matchers)}}}" + + if query_type == MetricQueryType.INSTANT: + result = self.prom.custom_query(query=query) + result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result] + else: + result = self.prom.custom_query_range( + query=query, + start_time=start_time, + end_time=end_time if end_time else None, + step=step, + ) + + series = [] + for metric_data in result: + values = [ + MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1])) + for point in metric_data["values"] + ] + series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values)) + + return GetMetricsResponse(data=series) + + except Exception as e: + print(f"Error querying metrics: {e}") + return GetMetricsResponse(data=[]) diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 991df21cd..b86367f92 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -18,16 +18,13 @@ from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.semconv.resource import ResourceAttributes -from prometheus_api_client import PrometheusConnect from llama_stack.apis.telemetry import ( Event, GetMetricsResponse, - MetricDataPoint, MetricEvent, MetricLabelMatcher, MetricQueryType, - MetricSeries, QueryCondition, QuerySpanTreeResponse, QueryTracesResponse, @@ -44,6 +41,7 @@ from llama_stack.distribution.datatypes import Api from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import ( ConsoleSpanProcessor, ) +from llama_stack.providers.inline.telemetry.meta_reference.prometheus_metrics_store import PrometheusMetricsStore from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( SQLiteSpanProcessor, ) @@ -118,9 +116,12 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): if TelemetrySink.OTEL in self.config.sinks: self.meter = metrics.get_meter(__name__) - self.prom = PrometheusConnect( - url=self.config.prometheus_endpoint, disable_ssl=self.config.prometheus_disable_ssl - ) + if self.config.metrics_store_config is not None: + if self.config.metrics_store_config.type == "prometheus": + self.metrics_store = PrometheusMetricsStore( + endpoint=self.config.metrics_store_config.endpoint, + disable_ssl=self.config.metrics_store_config.disable_ssl, + ) if TelemetrySink.SQLITE in self.config.sinks: self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) @@ -268,39 +269,13 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): query_type: MetricQueryType = MetricQueryType.RANGE, label_matchers: Optional[List[MetricLabelMatcher]] = None, ) -> GetMetricsResponse: - if self.prom is None: - raise ValueError("Prometheus endpoint not configured") - - try: - # Build query with label matchers if provided - query = metric_name - if label_matchers: - matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers] - query = f"{metric_name}{{{','.join(matchers)}}}" - - # Use instant query for current values, range query for historical data - if query_type == MetricQueryType.INSTANT: - result = self.prom.custom_query(query=query) - # Convert instant query results to same format as range query - result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result] - else: - result = self.prom.custom_query_range( - query=query, - start_time=start_time, - end_time=end_time if end_time else None, - step=step, - ) - - series = [] - for metric_data in result: - values = [ - MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1])) - for point in metric_data["values"] - ] - series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values)) - - return GetMetricsResponse(data=series) - - except Exception as e: - print(f"Error querying metrics: {e}") - return GetMetricsResponse(data=[]) + if not hasattr(self, "metrics_store"): + raise ValueError("Metrics store not configured") + return await self.metrics_store.get_metrics( + metric_name=metric_name, + start_time=start_time, + end_time=end_time, + step=step, + query_type=query_type, + label_matchers=label_matchers, + )