From c23363d56117648861e18224b0de68cc9c3d39d0 Mon Sep 17 00:00:00 2001 From: Dinesh Yeduguru Date: Thu, 5 Dec 2024 21:07:30 -0800 Subject: [PATCH] Add ability to query and export spans to dataset (#574) This PR adds two new methods to the telemetry API: 1) Gives the ability to query spans directly instead of first querying traces and then using that to get spans 2) Another method save_spans_to_dataset, which builds on the query spans to save it on dataset. This give the ability to saves spans that are part of an agent session to a dataset. The unique aspect of this API is that we dont require each provider of telemetry to implement this method. Hence, its implemented in the protocol class itself. This required the protocol check to be slightly modified. --- llama_stack/apis/telemetry/telemetry.py | 17 ++++ .../inline/eval/meta_reference/config.py | 3 +- .../inline/eval/meta_reference/eval.py | 3 +- .../telemetry/meta_reference/__init__.py | 2 +- .../telemetry/meta_reference/telemetry.py | 16 ++-- llama_stack/providers/registry/telemetry.py | 1 + .../providers/utils/telemetry/__init__.py | 3 + .../utils/telemetry/dataset_mixin.py | 87 +++++++++++++++++++ .../utils/telemetry/sqlite_trace_store.py | 4 +- 9 files changed, 126 insertions(+), 10 deletions(-) create mode 100644 llama_stack/providers/utils/telemetry/dataset_mixin.py diff --git a/llama_stack/apis/telemetry/telemetry.py b/llama_stack/apis/telemetry/telemetry.py index 2ff783c46..fd60d99a7 100644 --- a/llama_stack/apis/telemetry/telemetry.py +++ b/llama_stack/apis/telemetry/telemetry.py @@ -186,3 +186,20 @@ class Telemetry(Protocol): attributes_to_return: Optional[List[str]] = None, max_depth: Optional[int] = None, ) -> SpanWithChildren: ... + + @webmethod(route="/telemetry/query-spans", method="POST") + async def query_spans( + self, + attribute_filters: List[QueryCondition], + attributes_to_return: List[str], + max_depth: Optional[int] = None, + ) -> List[Span]: ... + + @webmethod(route="/telemetry/save-spans-to-dataset", method="POST") + async def save_spans_to_dataset( + self, + attribute_filters: List[QueryCondition], + attributes_to_save: List[str], + dataset_id: str, + max_depth: Optional[int] = None, + ) -> None: ... diff --git a/llama_stack/providers/inline/eval/meta_reference/config.py b/llama_stack/providers/inline/eval/meta_reference/config.py index 8538d32ad..95b780cca 100644 --- a/llama_stack/providers/inline/eval/meta_reference/config.py +++ b/llama_stack/providers/inline/eval/meta_reference/config.py @@ -3,12 +3,13 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. +from pydantic import BaseModel + from llama_stack.distribution.utils.config_dirs import RUNTIME_BASE_DIR from llama_stack.providers.utils.kvstore.config import ( KVStoreConfig, SqliteKVStoreConfig, ) -from pydantic import BaseModel class MetaReferenceEvalConfig(BaseModel): diff --git a/llama_stack/providers/inline/eval/meta_reference/eval.py b/llama_stack/providers/inline/eval/meta_reference/eval.py index c6cacfcc3..453215e41 100644 --- a/llama_stack/providers/inline/eval/meta_reference/eval.py +++ b/llama_stack/providers/inline/eval/meta_reference/eval.py @@ -4,7 +4,9 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. from enum import Enum +from typing import Any, Dict, List, Optional from llama_models.llama3.api.datatypes import * # noqa: F403 +from tqdm import tqdm from .....apis.common.job_types import Job from .....apis.eval.eval import Eval, EvalTaskConfig, EvaluateResponse, JobStatus @@ -17,7 +19,6 @@ from llama_stack.apis.inference import Inference from llama_stack.apis.scoring import Scoring from llama_stack.providers.datatypes import EvalTasksProtocolPrivate from llama_stack.providers.utils.kvstore import kvstore_impl -from tqdm import tqdm from .config import MetaReferenceEvalConfig diff --git a/llama_stack/providers/inline/telemetry/meta_reference/__init__.py b/llama_stack/providers/inline/telemetry/meta_reference/__init__.py index 6213d5536..38871a7e4 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/__init__.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/__init__.py @@ -13,6 +13,6 @@ __all__ = ["TelemetryConfig", "TelemetryAdapter", "TelemetrySink"] async def get_provider_impl(config: TelemetryConfig, deps: Dict[str, Any]): - impl = TelemetryAdapter(config) + impl = TelemetryAdapter(config, deps) await impl.initialize() return impl diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index 6540a667f..0bcc48afb 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -5,7 +5,7 @@ # the root directory of this source tree. import threading -from typing import List, Optional +from typing import Any, Dict, List, Optional from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter @@ -24,10 +24,15 @@ from llama_stack.providers.inline.telemetry.meta_reference.console_span_processo from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( SQLiteSpanProcessor, ) -from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore +from llama_stack.providers.utils.telemetry import ( + SQLiteTraceStore, + TelemetryDatasetMixin, +) from llama_stack.apis.telemetry import * # noqa: F403 +from llama_stack.distribution.datatypes import Api + from .config import TelemetryConfig, TelemetrySink _GLOBAL_STORAGE = { @@ -54,9 +59,10 @@ def is_tracing_enabled(tracer): return span.is_recording() -class TelemetryAdapter(Telemetry): - def __init__(self, config: TelemetryConfig) -> None: +class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): + def __init__(self, config: TelemetryConfig, deps: Dict[str, Any]) -> None: self.config = config + self.datasetio_api = deps[Api.datasetio] resource = Resource.create( { @@ -240,7 +246,7 @@ class TelemetryAdapter(Telemetry): attributes_to_return: Optional[List[str]] = None, max_depth: Optional[int] = None, ) -> SpanWithChildren: - return await self.trace_store.get_materialized_span( + return await self.trace_store.get_span_tree( span_id=span_id, attributes_to_return=attributes_to_return, max_depth=max_depth, diff --git a/llama_stack/providers/registry/telemetry.py b/llama_stack/providers/registry/telemetry.py index a53ad5b94..d367bf894 100644 --- a/llama_stack/providers/registry/telemetry.py +++ b/llama_stack/providers/registry/telemetry.py @@ -18,6 +18,7 @@ def available_providers() -> List[ProviderSpec]: "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-http", ], + api_dependencies=[Api.datasetio], module="llama_stack.providers.inline.telemetry.meta_reference", config_class="llama_stack.providers.inline.telemetry.meta_reference.config.TelemetryConfig", ), diff --git a/llama_stack/providers/utils/telemetry/__init__.py b/llama_stack/providers/utils/telemetry/__init__.py index 756f351d8..2d95a5dc5 100644 --- a/llama_stack/providers/utils/telemetry/__init__.py +++ b/llama_stack/providers/utils/telemetry/__init__.py @@ -3,3 +3,6 @@ # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. + +from .dataset_mixin import TelemetryDatasetMixin # noqa: F401 +from .sqlite_trace_store import SQLiteTraceStore, TraceStore # noqa: F401 diff --git a/llama_stack/providers/utils/telemetry/dataset_mixin.py b/llama_stack/providers/utils/telemetry/dataset_mixin.py new file mode 100644 index 000000000..7a59801f4 --- /dev/null +++ b/llama_stack/providers/utils/telemetry/dataset_mixin.py @@ -0,0 +1,87 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import List, Optional + +from llama_stack.apis.datasetio import DatasetIO +from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithChildren + + +class TelemetryDatasetMixin: + """Mixin class that provides dataset-related functionality for telemetry providers.""" + + datasetio_api: DatasetIO + + async def save_spans_to_dataset( + self, + attribute_filters: List[QueryCondition], + attributes_to_save: List[str], + dataset_id: str, + max_depth: Optional[int] = None, + ) -> None: + spans = await self.query_spans( + attribute_filters=attribute_filters, + attributes_to_return=attributes_to_save, + max_depth=max_depth, + ) + + rows = [ + { + "trace_id": span.trace_id, + "span_id": span.span_id, + "parent_span_id": span.parent_span_id, + "name": span.name, + "start_time": span.start_time, + "end_time": span.end_time, + **{attr: span.attributes.get(attr) for attr in attributes_to_save}, + } + for span in spans + ] + + await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows) + + async def query_spans( + self, + attribute_filters: List[QueryCondition], + attributes_to_return: List[str], + max_depth: Optional[int] = None, + ) -> List[Span]: + traces = await self.query_traces(attribute_filters=attribute_filters) + spans = [] + + for trace in traces: + span_tree = await self.get_span_tree( + span_id=trace.root_span_id, + attributes_to_return=attributes_to_return, + max_depth=max_depth, + ) + + def extract_spans(span: SpanWithChildren) -> List[Span]: + result = [] + if span.attributes and all( + attr in span.attributes and span.attributes[attr] is not None + for attr in attributes_to_return + ): + result.append( + Span( + trace_id=trace.root_span_id, + span_id=span.span_id, + parent_span_id=span.parent_span_id, + name=span.name, + start_time=span.start_time, + end_time=span.end_time, + attributes=span.attributes, + ) + ) + + for child in span.children: + result.extend(extract_spans(child)) + + return result + + spans.extend(extract_spans(span_tree)) + + return spans diff --git a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py index ed1343e0b..031b6fc73 100644 --- a/llama_stack/providers/utils/telemetry/sqlite_trace_store.py +++ b/llama_stack/providers/utils/telemetry/sqlite_trace_store.py @@ -23,7 +23,7 @@ class TraceStore(Protocol): order_by: Optional[List[str]] = None, ) -> List[Trace]: ... - async def get_materialized_span( + async def get_span_tree( self, span_id: str, attributes_to_return: Optional[List[str]] = None, @@ -111,7 +111,7 @@ class SQLiteTraceStore(TraceStore): for row in rows ] - async def get_materialized_span( + async def get_span_tree( self, span_id: str, attributes_to_return: Optional[List[str]] = None,