Idiomatic REST API: Telemetry (#786)

# What does this PR do?

Changes Telemetry API to follow more idiomatic REST


- [ ] Addresses issue (#issue)


## Test Plan

TBD, once i get an approval for rest endpoints
This commit is contained in:
Dinesh Yeduguru 2025-01-16 12:08:46 -08:00 committed by GitHub
parent c79b087552
commit 59eeaf7f81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 955 additions and 669 deletions

View file

@ -169,39 +169,57 @@ class QueryCondition(BaseModel):
value: Any
class QueryTracesResponse(BaseModel):
data: List[Trace]
class QuerySpansResponse(BaseModel):
data: List[Span]
class QuerySpanTreeResponse(BaseModel):
data: Dict[str, SpanWithStatus]
@runtime_checkable
class Telemetry(Protocol):
@webmethod(route="/telemetry/log-event")
@webmethod(route="/telemetry/events", method="POST")
async def log_event(
self, event: Event, ttl_seconds: int = DEFAULT_TTL_DAYS * 86400
) -> None: ...
@webmethod(route="/telemetry/query-traces", method="POST")
@webmethod(route="/telemetry/traces", method="GET")
async def query_traces(
self,
attribute_filters: Optional[List[QueryCondition]] = None,
limit: Optional[int] = 100,
offset: Optional[int] = 0,
order_by: Optional[List[str]] = None,
) -> List[Trace]: ...
) -> QueryTracesResponse: ...
@webmethod(route="/telemetry/query-span-tree", method="POST")
async def query_span_tree(
@webmethod(route="/telemetry/traces/{trace_id}", method="GET")
async def get_trace(self, trace_id: str) -> Trace: ...
@webmethod(route="/telemetry/traces/{trace_id}/spans/{span_id}", method="GET")
async def get_span(self, trace_id: str, span_id: str) -> Span: ...
@webmethod(route="/telemetry/spans/{span_id}/tree", method="GET")
async def get_span_tree(
self,
span_id: str,
attributes_to_return: Optional[List[str]] = None,
max_depth: Optional[int] = None,
) -> Dict[str, SpanWithStatus]: ...
) -> QuerySpanTreeResponse: ...
@webmethod(route="/telemetry/query-spans", method="POST")
@webmethod(route="/telemetry/spans", method="GET")
async def query_spans(
self,
attribute_filters: List[QueryCondition],
attributes_to_return: List[str],
max_depth: Optional[int] = None,
) -> List[Span]: ...
) -> QuerySpansResponse: ...
@webmethod(route="/telemetry/save-spans-to-dataset", method="POST")
@webmethod(route="/telemetry/spans/export", method="POST")
async def save_spans_to_dataset(
self,
attribute_filters: List[QueryCondition],

View file

@ -21,10 +21,12 @@ from llama_stack.apis.telemetry import (
Event,
MetricEvent,
QueryCondition,
QuerySpanTreeResponse,
QueryTracesResponse,
Span,
SpanEndPayload,
SpanStartPayload,
SpanStatus,
SpanWithStatus,
StructuredLogEvent,
Telemetry,
Trace,
@ -241,22 +243,32 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
limit: Optional[int] = 100,
offset: Optional[int] = 0,
order_by: Optional[List[str]] = None,
) -> List[Trace]:
return await self.trace_store.query_traces(
attribute_filters=attribute_filters,
limit=limit,
offset=offset,
order_by=order_by,
) -> QueryTracesResponse:
return QueryTracesResponse(
data=await self.trace_store.query_traces(
attribute_filters=attribute_filters,
limit=limit,
offset=offset,
order_by=order_by,
)
)
async def query_span_tree(
async def get_trace(self, trace_id: str) -> Trace:
return await self.trace_store.get_trace(trace_id)
async def get_span(self, trace_id: str, span_id: str) -> Span:
return await self.trace_store.get_span(trace_id, span_id)
async def get_span_tree(
self,
span_id: str,
attributes_to_return: Optional[List[str]] = None,
max_depth: Optional[int] = None,
) -> Dict[str, SpanWithStatus]:
return await self.trace_store.get_span_tree(
span_id=span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
) -> QuerySpanTreeResponse:
return QuerySpanTreeResponse(
data=await self.trace_store.get_span_tree(
span_id=span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)
)

View file

@ -7,7 +7,7 @@
from typing import List, Optional
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.telemetry import QueryCondition, Span
from llama_stack.apis.telemetry import QueryCondition, QuerySpansResponse, Span
class TelemetryDatasetMixin:
@ -48,18 +48,18 @@ class TelemetryDatasetMixin:
attribute_filters: List[QueryCondition],
attributes_to_return: List[str],
max_depth: Optional[int] = None,
) -> List[Span]:
) -> QuerySpansResponse:
traces = await self.query_traces(attribute_filters=attribute_filters)
spans = []
for trace in traces:
spans_by_id = await self.query_span_tree(
for trace in traces.data:
spans_by_id_resp = await self.get_span_tree(
span_id=trace.root_span_id,
attributes_to_return=attributes_to_return,
max_depth=max_depth,
)
for span in spans_by_id.values():
for span in spans_by_id_resp.data.values():
if span.attributes and all(
attr in span.attributes and span.attributes[attr] is not None
for attr in attributes_to_return
@ -76,4 +76,4 @@ class TelemetryDatasetMixin:
)
)
return spans
return QuerySpansResponse(data=spans)

View file

@ -10,7 +10,7 @@ from typing import Dict, List, Optional, Protocol
import aiosqlite
from llama_stack.apis.telemetry import QueryCondition, SpanWithStatus, Trace
from llama_stack.apis.telemetry import QueryCondition, Span, SpanWithStatus, Trace
class TraceStore(Protocol):
@ -167,3 +167,23 @@ class SQLiteTraceStore(TraceStore):
spans_by_id[span.span_id] = span
return spans_by_id
async def get_trace(self, trace_id: str) -> Trace:
query = "SELECT * FROM traces WHERE trace_id = ?"
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id,)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Trace {trace_id} not found")
return Trace(**row)
async def get_span(self, trace_id: str, span_id: str) -> Span:
query = "SELECT * FROM spans WHERE trace_id = ? AND span_id = ?"
async with aiosqlite.connect(self.conn_string) as conn:
conn.row_factory = aiosqlite.Row
async with conn.execute(query, (trace_id, span_id)) as cursor:
row = await cursor.fetchone()
if row is None:
raise ValueError(f"Span {span_id} not found")
return Span(**row)