create metric_store

This commit is contained in:
Dinesh Yeduguru 2025-02-05 10:29:48 -08:00
parent 23c1aa4504
commit e9bb96334b
4 changed files with 146 additions and 51 deletions

View file

@ -5,7 +5,7 @@
# the root directory of this source tree. # the root directory of this source tree.
from enum import Enum from enum import Enum
from typing import Any, Dict, List from typing import Annotated, Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
@ -18,19 +18,44 @@ class TelemetrySink(str, Enum):
CONSOLE = "console" CONSOLE = "console"
class MetricsStoreType(str, Enum):
PROMETHEUS = "prometheus"
class PrometheusConfig(BaseModel):
endpoint: str = Field(
default="http://localhost:9090",
description="The Prometheus endpoint URL",
)
disable_ssl: bool = Field(
default=True,
description="Whether to disable SSL for the Prometheus endpoint",
)
class PrometheusMetricsStoreConfig(BaseModel):
type: Literal["prometheus"] = Field(
default="prometheus",
description="The type of metrics store to use",
)
endpoint: str = Field(
default="http://localhost:9090",
description="The Prometheus endpoint URL",
)
disable_ssl: bool = Field(
default=True,
description="Whether to disable SSL for the Prometheus endpoint",
)
MetricsStoreConfig = Annotated[Union[PrometheusMetricsStoreConfig], Field(discriminator="type")]
class TelemetryConfig(BaseModel): class TelemetryConfig(BaseModel):
otel_endpoint: str = Field( otel_endpoint: str = Field(
default="http://localhost:4318", default="http://localhost:4318",
description="The OpenTelemetry collector endpoint URL", description="The OpenTelemetry collector endpoint URL",
) )
prometheus_endpoint: str = Field(
default="http://localhost:9090",
description="The Prometheus endpoint URL",
)
prometheus_disable_ssl: bool = Field(
default=True,
description="Whether to disable SSL for the Prometheus endpoint",
)
service_name: str = Field( service_name: str = Field(
default="llama-stack", default="llama-stack",
description="The service name to use for telemetry", description="The service name to use for telemetry",
@ -43,6 +68,7 @@ class TelemetryConfig(BaseModel):
default=(RUNTIME_BASE_DIR / "trace_store.db").as_posix(), default=(RUNTIME_BASE_DIR / "trace_store.db").as_posix(),
description="The path to the SQLite database to use for storing traces", description="The path to the SQLite database to use for storing traces",
) )
metrics_store_config: Optional[MetricsStoreConfig] = None
@field_validator("sinks", mode="before") @field_validator("sinks", mode="before")
@classmethod @classmethod

View file

@ -0,0 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Optional
from llama_stack.apis.telemetry import (
GetMetricsResponse,
MetricLabelMatcher,
MetricQueryType,
)
class MetricsStore(ABC):
@abstractmethod
async def get_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: Optional[datetime] = None,
step: Optional[str] = "15s",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: Optional[List[MetricLabelMatcher]] = None,
) -> GetMetricsResponse:
pass

View file

@ -0,0 +1,65 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from datetime import datetime
from typing import List, Optional
from prometheus_api_client import PrometheusConnect
from llama_stack.apis.telemetry import (
GetMetricsResponse,
MetricDataPoint,
MetricLabelMatcher,
MetricQueryType,
MetricSeries,
)
from .metrics_store import MetricsStore
class PrometheusMetricsStore(MetricsStore):
def __init__(self, endpoint: str, disable_ssl: bool = True):
self.prom = PrometheusConnect(url=endpoint, disable_ssl=disable_ssl)
async def get_metrics(
self,
metric_name: str,
start_time: datetime,
end_time: Optional[datetime] = None,
step: Optional[str] = "15s",
query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: Optional[List[MetricLabelMatcher]] = None,
) -> GetMetricsResponse:
try:
query = metric_name
if label_matchers:
matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers]
query = f"{metric_name}{{{','.join(matchers)}}}"
if query_type == MetricQueryType.INSTANT:
result = self.prom.custom_query(query=query)
result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result]
else:
result = self.prom.custom_query_range(
query=query,
start_time=start_time,
end_time=end_time if end_time else None,
step=step,
)
series = []
for metric_data in result:
values = [
MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1]))
for point in metric_data["values"]
]
series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values))
return GetMetricsResponse(data=series)
except Exception as e:
print(f"Error querying metrics: {e}")
return GetMetricsResponse(data=[])

View file

@ -18,16 +18,13 @@ from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.semconv.resource import ResourceAttributes
from prometheus_api_client import PrometheusConnect
from llama_stack.apis.telemetry import ( from llama_stack.apis.telemetry import (
Event, Event,
GetMetricsResponse, GetMetricsResponse,
MetricDataPoint,
MetricEvent, MetricEvent,
MetricLabelMatcher, MetricLabelMatcher,
MetricQueryType, MetricQueryType,
MetricSeries,
QueryCondition, QueryCondition,
QuerySpanTreeResponse, QuerySpanTreeResponse,
QueryTracesResponse, QueryTracesResponse,
@ -44,6 +41,7 @@ from llama_stack.distribution.datatypes import Api
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import ( from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
ConsoleSpanProcessor, ConsoleSpanProcessor,
) )
from llama_stack.providers.inline.telemetry.meta_reference.prometheus_metrics_store import PrometheusMetricsStore
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
SQLiteSpanProcessor, SQLiteSpanProcessor,
) )
@ -118,9 +116,12 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
if TelemetrySink.OTEL in self.config.sinks: if TelemetrySink.OTEL in self.config.sinks:
self.meter = metrics.get_meter(__name__) self.meter = metrics.get_meter(__name__)
self.prom = PrometheusConnect( if self.config.metrics_store_config is not None:
url=self.config.prometheus_endpoint, disable_ssl=self.config.prometheus_disable_ssl if self.config.metrics_store_config.type == "prometheus":
) self.metrics_store = PrometheusMetricsStore(
endpoint=self.config.metrics_store_config.endpoint,
disable_ssl=self.config.metrics_store_config.disable_ssl,
)
if TelemetrySink.SQLITE in self.config.sinks: if TelemetrySink.SQLITE in self.config.sinks:
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
@ -268,39 +269,13 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
query_type: MetricQueryType = MetricQueryType.RANGE, query_type: MetricQueryType = MetricQueryType.RANGE,
label_matchers: Optional[List[MetricLabelMatcher]] = None, label_matchers: Optional[List[MetricLabelMatcher]] = None,
) -> GetMetricsResponse: ) -> GetMetricsResponse:
if self.prom is None: if not hasattr(self, "metrics_store"):
raise ValueError("Prometheus endpoint not configured") raise ValueError("Metrics store not configured")
return await self.metrics_store.get_metrics(
try: metric_name=metric_name,
# Build query with label matchers if provided start_time=start_time,
query = metric_name end_time=end_time,
if label_matchers: step=step,
matchers = [f'{m.name}{m.operator.value}"{m.value}"' for m in label_matchers] query_type=query_type,
query = f"{metric_name}{{{','.join(matchers)}}}" label_matchers=label_matchers,
)
# Use instant query for current values, range query for historical data
if query_type == MetricQueryType.INSTANT:
result = self.prom.custom_query(query=query)
# Convert instant query results to same format as range query
result = [{"metric": r["metric"], "values": [[r["value"][0], r["value"][1]]]} for r in result]
else:
result = self.prom.custom_query_range(
query=query,
start_time=start_time,
end_time=end_time if end_time else None,
step=step,
)
series = []
for metric_data in result:
values = [
MetricDataPoint(timestamp=datetime.fromtimestamp(point[0]), value=float(point[1]))
for point in metric_data["values"]
]
series.append(MetricSeries(metric=metric_name, labels=metric_data.get("metric", {}), values=values))
return GetMetricsResponse(data=series)
except Exception as e:
print(f"Error querying metrics: {e}")
return GetMetricsResponse(data=[])