mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-02 08:44:44 +00:00
address feedback
This commit is contained in:
parent
208fd33048
commit
04e6998d6f
6 changed files with 105 additions and 64 deletions
|
@ -21,8 +21,6 @@ from llama_models.schema_utils import json_schema_type, webmethod
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from typing_extensions import Annotated
|
from typing_extensions import Annotated
|
||||||
|
|
||||||
from llama_stack.apis.datasetio import DatasetIO
|
|
||||||
|
|
||||||
# Add this constant near the top of the file, after the imports
|
# Add this constant near the top of the file, after the imports
|
||||||
DEFAULT_TTL_DAYS = 7
|
DEFAULT_TTL_DAYS = 7
|
||||||
|
|
||||||
|
@ -167,9 +165,6 @@ class QueryCondition(BaseModel):
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class Telemetry(Protocol):
|
class Telemetry(Protocol):
|
||||||
|
|
||||||
# Each provider must initialize this dependency.
|
|
||||||
datasetio_api: DatasetIO
|
|
||||||
|
|
||||||
@webmethod(route="/telemetry/log-event")
|
@webmethod(route="/telemetry/log-event")
|
||||||
async def log_event(
|
async def log_event(
|
||||||
self, event: Event, ttl_seconds: int = DEFAULT_TTL_DAYS * 86400
|
self, event: Event, ttl_seconds: int = DEFAULT_TTL_DAYS * 86400
|
||||||
|
@ -198,41 +193,7 @@ class Telemetry(Protocol):
|
||||||
attribute_filters: List[QueryCondition],
|
attribute_filters: List[QueryCondition],
|
||||||
attributes_to_return: List[str],
|
attributes_to_return: List[str],
|
||||||
max_depth: Optional[int] = None,
|
max_depth: Optional[int] = None,
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Span]: ...
|
||||||
traces = await self.query_traces(attribute_filters=attribute_filters)
|
|
||||||
|
|
||||||
rows = []
|
|
||||||
|
|
||||||
for trace in traces:
|
|
||||||
span_tree = await self.get_span_tree(
|
|
||||||
span_id=trace.root_span_id,
|
|
||||||
attributes_to_return=attributes_to_return,
|
|
||||||
max_depth=max_depth,
|
|
||||||
)
|
|
||||||
|
|
||||||
def extract_spans(span: SpanWithChildren) -> List[Dict[str, Any]]:
|
|
||||||
rows = []
|
|
||||||
if span.attributes and all(
|
|
||||||
attr in span.attributes and span.attributes[attr] is not None
|
|
||||||
for attr in attributes_to_return
|
|
||||||
):
|
|
||||||
row = {
|
|
||||||
"trace_id": trace.root_span_id,
|
|
||||||
"span_id": span.span_id,
|
|
||||||
"step_name": span.name,
|
|
||||||
}
|
|
||||||
for attr in attributes_to_return:
|
|
||||||
row[attr] = str(span.attributes[attr])
|
|
||||||
rows.append(row)
|
|
||||||
|
|
||||||
for child in span.children:
|
|
||||||
rows.extend(extract_spans(child))
|
|
||||||
|
|
||||||
return rows
|
|
||||||
|
|
||||||
rows.extend(extract_spans(span_tree))
|
|
||||||
|
|
||||||
return rows
|
|
||||||
|
|
||||||
@webmethod(route="/telemetry/save-spans-to-dataset", method="POST")
|
@webmethod(route="/telemetry/save-spans-to-dataset", method="POST")
|
||||||
async def save_spans_to_dataset(
|
async def save_spans_to_dataset(
|
||||||
|
@ -241,14 +202,4 @@ class Telemetry(Protocol):
|
||||||
attributes_to_save: List[str],
|
attributes_to_save: List[str],
|
||||||
dataset_id: str,
|
dataset_id: str,
|
||||||
max_depth: Optional[int] = None,
|
max_depth: Optional[int] = None,
|
||||||
) -> None:
|
) -> None: ...
|
||||||
annotation_rows = await self.query_spans(
|
|
||||||
attribute_filters=attribute_filters,
|
|
||||||
attributes_to_return=attributes_to_save,
|
|
||||||
max_depth=max_depth,
|
|
||||||
)
|
|
||||||
|
|
||||||
if annotation_rows:
|
|
||||||
await self.datasetio_api.append_rows(
|
|
||||||
dataset_id=dataset_id, rows=annotation_rows
|
|
||||||
)
|
|
||||||
|
|
|
@ -345,18 +345,15 @@ def check_protocol_compliance(obj: Any, protocol: Any) -> None:
|
||||||
)
|
)
|
||||||
missing_methods.append((name, "signature_mismatch"))
|
missing_methods.append((name, "signature_mismatch"))
|
||||||
else:
|
else:
|
||||||
# Check if the method is actually implemented in the class
|
|
||||||
method_owner = next(
|
method_owner = next(
|
||||||
(cls for cls in mro if name in cls.__dict__), None
|
(cls for cls in mro if name in cls.__dict__), None
|
||||||
)
|
)
|
||||||
proto_method = getattr(protocol, name)
|
if (
|
||||||
if method_owner is None:
|
method_owner is None
|
||||||
|
or method_owner.__name__ == protocol.__name__
|
||||||
|
):
|
||||||
|
print(mro)
|
||||||
missing_methods.append((name, "not_actually_implemented"))
|
missing_methods.append((name, "not_actually_implemented"))
|
||||||
elif method_owner.__name__ == protocol.__name__:
|
|
||||||
# Check if it's just a stub (...) or has real implementation
|
|
||||||
proto_source = inspect.getsource(proto_method)
|
|
||||||
if "..." in proto_source:
|
|
||||||
missing_methods.append((name, "not_actually_implemented"))
|
|
||||||
|
|
||||||
if missing_methods:
|
if missing_methods:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
|
|
@ -24,7 +24,10 @@ from llama_stack.providers.inline.telemetry.meta_reference.console_span_processo
|
||||||
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
|
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
|
||||||
SQLiteSpanProcessor,
|
SQLiteSpanProcessor,
|
||||||
)
|
)
|
||||||
from llama_stack.providers.utils.telemetry.sqlite_trace_store import SQLiteTraceStore
|
from llama_stack.providers.utils.telemetry import (
|
||||||
|
SQLiteTraceStore,
|
||||||
|
TelemetryDatasetMixin,
|
||||||
|
)
|
||||||
|
|
||||||
from llama_stack.apis.telemetry import * # noqa: F403
|
from llama_stack.apis.telemetry import * # noqa: F403
|
||||||
|
|
||||||
|
@ -56,7 +59,7 @@ def is_tracing_enabled(tracer):
|
||||||
return span.is_recording()
|
return span.is_recording()
|
||||||
|
|
||||||
|
|
||||||
class TelemetryAdapter(Telemetry):
|
class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
|
||||||
def __init__(self, config: TelemetryConfig, deps: Dict[str, Any]) -> None:
|
def __init__(self, config: TelemetryConfig, deps: Dict[str, Any]) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.datasetio_api = deps[Api.datasetio]
|
self.datasetio_api = deps[Api.datasetio]
|
||||||
|
@ -243,7 +246,7 @@ class TelemetryAdapter(Telemetry):
|
||||||
attributes_to_return: Optional[List[str]] = None,
|
attributes_to_return: Optional[List[str]] = None,
|
||||||
max_depth: Optional[int] = None,
|
max_depth: Optional[int] = None,
|
||||||
) -> SpanWithChildren:
|
) -> SpanWithChildren:
|
||||||
return await self.trace_store.get_materialized_span(
|
return await self.trace_store.get_span_tree(
|
||||||
span_id=span_id,
|
span_id=span_id,
|
||||||
attributes_to_return=attributes_to_return,
|
attributes_to_return=attributes_to_return,
|
||||||
max_depth=max_depth,
|
max_depth=max_depth,
|
||||||
|
|
|
@ -3,3 +3,6 @@
|
||||||
#
|
#
|
||||||
# This source code is licensed under the terms described in the LICENSE file in
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
# the root directory of this source tree.
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
from .dataset_mixin import TelemetryDatasetMixin # noqa: F401
|
||||||
|
from .sqlite_trace_store import SQLiteTraceStore, TraceStore # noqa: F401
|
||||||
|
|
87
llama_stack/providers/utils/telemetry/dataset_mixin.py
Normal file
87
llama_stack/providers/utils/telemetry/dataset_mixin.py
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This source code is licensed under the terms described in the LICENSE file in
|
||||||
|
# the root directory of this source tree.
|
||||||
|
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from llama_stack.apis.datasetio import DatasetIO
|
||||||
|
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithChildren
|
||||||
|
|
||||||
|
|
||||||
|
class TelemetryDatasetMixin:
|
||||||
|
"""Mixin class that provides dataset-related functionality for telemetry providers."""
|
||||||
|
|
||||||
|
datasetio_api: DatasetIO
|
||||||
|
|
||||||
|
async def save_spans_to_dataset(
|
||||||
|
self,
|
||||||
|
attribute_filters: List[QueryCondition],
|
||||||
|
attributes_to_save: List[str],
|
||||||
|
dataset_id: str,
|
||||||
|
max_depth: Optional[int] = None,
|
||||||
|
) -> None:
|
||||||
|
spans = await self.query_spans(
|
||||||
|
attribute_filters=attribute_filters,
|
||||||
|
attributes_to_return=attributes_to_save,
|
||||||
|
max_depth=max_depth,
|
||||||
|
)
|
||||||
|
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"trace_id": span.trace_id,
|
||||||
|
"span_id": span.span_id,
|
||||||
|
"parent_span_id": span.parent_span_id,
|
||||||
|
"name": span.name,
|
||||||
|
"start_time": span.start_time,
|
||||||
|
"end_time": span.end_time,
|
||||||
|
**{attr: span.attributes.get(attr) for attr in attributes_to_save},
|
||||||
|
}
|
||||||
|
for span in spans
|
||||||
|
]
|
||||||
|
|
||||||
|
await self.datasetio_api.append_rows(dataset_id=dataset_id, rows=rows)
|
||||||
|
|
||||||
|
async def query_spans(
|
||||||
|
self,
|
||||||
|
attribute_filters: List[QueryCondition],
|
||||||
|
attributes_to_return: List[str],
|
||||||
|
max_depth: Optional[int] = None,
|
||||||
|
) -> List[Span]:
|
||||||
|
traces = await self.query_traces(attribute_filters=attribute_filters)
|
||||||
|
spans = []
|
||||||
|
|
||||||
|
for trace in traces:
|
||||||
|
span_tree = await self.get_span_tree(
|
||||||
|
span_id=trace.root_span_id,
|
||||||
|
attributes_to_return=attributes_to_return,
|
||||||
|
max_depth=max_depth,
|
||||||
|
)
|
||||||
|
|
||||||
|
def extract_spans(span: SpanWithChildren) -> List[Span]:
|
||||||
|
result = []
|
||||||
|
if span.attributes and all(
|
||||||
|
attr in span.attributes and span.attributes[attr] is not None
|
||||||
|
for attr in attributes_to_return
|
||||||
|
):
|
||||||
|
result.append(
|
||||||
|
Span(
|
||||||
|
trace_id=trace.root_span_id,
|
||||||
|
span_id=span.span_id,
|
||||||
|
parent_span_id=span.parent_span_id,
|
||||||
|
name=span.name,
|
||||||
|
start_time=span.start_time,
|
||||||
|
end_time=span.end_time,
|
||||||
|
attributes=span.attributes,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for child in span.children:
|
||||||
|
result.extend(extract_spans(child))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
spans.extend(extract_spans(span_tree))
|
||||||
|
|
||||||
|
return spans
|
|
@ -23,7 +23,7 @@ class TraceStore(Protocol):
|
||||||
order_by: Optional[List[str]] = None,
|
order_by: Optional[List[str]] = None,
|
||||||
) -> List[Trace]: ...
|
) -> List[Trace]: ...
|
||||||
|
|
||||||
async def get_materialized_span(
|
async def get_span_tree(
|
||||||
self,
|
self,
|
||||||
span_id: str,
|
span_id: str,
|
||||||
attributes_to_return: Optional[List[str]] = None,
|
attributes_to_return: Optional[List[str]] = None,
|
||||||
|
@ -111,7 +111,7 @@ class SQLiteTraceStore(TraceStore):
|
||||||
for row in rows
|
for row in rows
|
||||||
]
|
]
|
||||||
|
|
||||||
async def get_materialized_span(
|
async def get_span_tree(
|
||||||
self,
|
self,
|
||||||
span_id: str,
|
span_id: str,
|
||||||
attributes_to_return: Optional[List[str]] = None,
|
attributes_to_return: Optional[List[str]] = None,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue