diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index 2ff783c46..fd60d99a7 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -186,3 +186,20 @@ class Telemetry(Protocol): attributes_to_return: Optional[List[str]] = None, max_depth: Optional[int] = None, ) -> SpanWithChildren: ... + + @webmethod(route="/telemetry/query-spans", method="POST") + async def query_spans( + self, + attribute_filters: List[QueryCondition], + attributes_to_return: List[str], + max_depth: Optional[int] = None, + ) -> List[Span]: ... + + @webmethod(route="/telemetry/save-spans-to-dataset", method="POST") + async def save_spans_to_dataset( + self, + attribute_filters: List[QueryCondition], + attributes_to_save: List[str], + dataset_id: str, + max_depth: Optional[int] = None, + ) -> None: ... diff --git a/llama_stack/providers/inline/eval/meta_reference/config.py b/llama_stack/providers/inline/eval/meta_reference/config.py index 8538d32ad..95b780cca 100644 --- a/llama_stack/providers/inline/eval/meta_reference/config.py +++ b/llama_stack/providers/inline/eval/meta_reference/config.py @@ -3,12 +3,13 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +from pydantic import BaseModel + from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR from llama_stack.providers.utils.kvstore.config import ( KVStoreConfig, SqliteKVStoreConfig, ) -from pydantic import BaseModel class MetaReferenceEvalConfig(BaseModel): diff --git a/llama_stack/providers/inline/eval/meta_reference/eval.py b/llama_stack/providers/inline/eval/meta_reference/eval.py index c6cacfcc3..453215e41 100644 --- a/llama_stack/providers/inline/eval/meta_reference/eval.py +++ b/llama_stack/providers/inline/eval/meta_reference/eval.py @@ -4,7 +4,9 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. from enum import Enum +from typing import Any, Dict, List, Optional from llama_models.llama3.api.datatypes import * # noqa: F403 +from tqdm import tqdm from .....apis.common.job_types import Job from .....apis.eval.eval import Eval, EvalTaskConfig, EvaluateResponse, JobStatus @@ -17,7 +19,6 @@ from llama_stack.apis.inference import Inference from llama_stack.apis.scoring import Scoring from llama_stack.providers.datatypes import EvalTasksProtocolPrivate from llama_stack.providers.utils.kvstore import kvstore_impl -from tqdm import tqdm from .config import MetaReferenceEvalConfig diff --git a/llama_stack/providers/inline/telemetry/meta_reference/__init__.py b/llama_stack/providers/inline/telemetry/meta_reference/__init__.py index 6213d5536..38871a7e4 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/__init__.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/__init__.py @@ -13,6 +13,6 @@ __all__ = ["TelemetryConfig", "TelemetryAdapter", "TelemetrySink"] async def get_provider_impl(config: TelemetryConfig, deps: Dict[str, Any]): - impl = TelemetryAdapter(config) + impl = TelemetryAdapter(config, deps) await impl.initialize() return impl diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 6540a667f..0bcc48afb 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -5,7 +5,7 @@ # the root directory of this source tree. import threading -from typing import List, Optional +from typing import Any, Dict, List, Optional from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter @@ -24,10 +24,15 @@ from llama_stack.providers.inline.telemetry.meta_reference.console_span_processo from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( SQLiteSpanProcessor, ) -from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore +from llama_stack.providers.utils.telemetry import ( + SQLiteTraceStore, + TelemetryDatasetMixin, +) from llama_stack.apis.telemetry import * # noqa: F403 +from llama_stack.distribution.datatypes import Api + from .config import TelemetryConfig, TelemetrySink _GLOBAL_STORAGE = { @@ -54,9 +59,10 @@ def is_tracing_enabled(tracer): return span.is_recording() -class TelemetryAdapter(Telemetry): - def __init__(self, config: TelemetryConfig) -> None: +class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): + def __init__(self, config: TelemetryConfig, deps: Dict[str, Any]) -> None: self.config = config + self.datasetio_api = deps[Api.datasetio] resource = Resource.create( { @@ -240,7 +246,7 @@ class TelemetryAdapter(Telemetry): attributes_to_return: Optional[List[str]] = None, max_depth: Optional[int] = None, ) -> SpanWithChildren: - return await self.trace_store.get_materialized_span( + return await self.trace_store.get_span_tree( span_id=span_id, attributes_to_return=attributes_to_return, max_depth=max_depth, diff --git a/llama_stack/providers/registry/telemetry.py b/llama_stack/providers/registry/telemetry.py index a53ad5b94..d367bf894 100644 --- a/llama_stack/providers/registry/telemetry.py +++ b/llama_stack/providers/registry/telemetry.py @@ -18,6 +18,7 @@ def available_providers() -> List[ProviderSpec]: "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http", ], + api_dependencies=[Api.datasetio], module="llama_stack.providers.inline.telemetry.meta_reference", config_class="llama_stack.providers.inline.telemetry.meta_reference.config.TelemetryConfig", ), diff --git a/llama_stack/providers/utils/telemetry/__init__.py b/llama_stack/providers/utils/telemetry/__init__.py index 756f351d8..2d95a5dc5 100644 --- a/llama_stack/providers/utils/telemetry/__init__.py +++ b/llama_stack/providers/utils/telemetry/__init__.py @@ -3,3 +3,6 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. + +from .dataset_mixin import TelemetryDatasetMixin # noqa: F401 +from .sqlite_trace_store import SQLiteTraceStore, TraceStore # noqa: F401 diff --git a/llama_stack/providers/utils/telemetry/dataset_mixin.py b/llama_stack/providers/utils/telemetry/dataset_mixin.py new file mode 100644 index 000000000..7a59801f4 --- /dev/null +++ b/llama_stack/providers/utils/telemetry/dataset_mixin.py @@ -0,0 +1,87 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import List, Optional + +from llama_stack.apis.datasetio import DatasetIO +from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithChildren + + +class TelemetryDatasetMixin: + """Mixin class that provides dataset-related functionality for telemetry providers.""" + + datasetio_api: DatasetIO + + async def save_spans_to_dataset( + self, + attribute_filters: List[QueryCondition], + attributes_to_save: List[str], + dataset_id: str, + max_depth: Optional[int] = None, + ) -> None: + spans = await self.query_spans( + attribute_filters=attribute_filters, + attributes_to_return=attributes_to_save, + max_depth=max_depth, + ) + + rows = [ + { + "trace_id": span.trace_id, + "span_id": span.span_id, + "parent_span_id": span.parent_span_id, + "name": span.name, + "start_time": span.start_time, + "end_time": span.end_time, + **{attr: span.attributes.get(attr) for attr in attributes_to_save}, + } + for span in spans + ] + + await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows) + + async def query_spans( + self, + attribute_filters: List[QueryCondition], + attributes_to_return: List[str], + max_depth: Optional[int] = None, + ) -> List[Span]: + traces = await self.query_traces(attribute_filters=attribute_filters) + spans = [] + + for trace in traces: + span_tree = await self.get_span_tree( + span_id=trace.root_span_id, + attributes_to_return=attributes_to_return, + max_depth=max_depth, + ) + + def extract_spans(span: SpanWithChildren) -> List[Span]: + result = [] + if span.attributes and all( + attr in span.attributes and span.attributes[attr] is not None + for attr in attributes_to_return + ): + result.append( + Span( + trace_id=trace.root_span_id, + span_id=span.span_id, + parent_span_id=span.parent_span_id, + name=span.name, + start_time=span.start_time, + end_time=span.end_time, + attributes=span.attributes, + ) + ) + + for child in span.children: + result.extend(extract_spans(child)) + + return result + + spans.extend(extract_spans(span_tree)) + + return spans diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py index ed1343e0b..031b6fc73 100644 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py @@ -23,7 +23,7 @@ class TraceStore(Protocol): order_by: Optional[List[str]] = None, ) -> List[Trace]: ... - async def get_materialized_span( + async def get_span_tree( self, span_id: str, attributes_to_return: Optional[List[str]] = None, @@ -111,7 +111,7 @@ class SQLiteTraceStore(TraceStore): for row in rows ] - async def get_materialized_span( + async def get_span_tree( self, span_id: str, attributes_to_return: Optional[List[str]] = None,