From e8c2f068a3b064b666c9084cb6bf0228f568fd19 Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Wed, 11 Sep 2024 14:24:54 -0700 Subject: [PATCH] move span events one level down into structured log events --- llama_toolchain/telemetry/api/api.py | 65 +++++++++++++------- llama_toolchain/telemetry/console/console.py | 33 ++++------ llama_toolchain/telemetry/tracing.py | 19 +++--- 3 files changed, 65 insertions(+), 52 deletions(-) diff --git a/llama_toolchain/telemetry/api/api.py b/llama_toolchain/telemetry/api/api.py index 812806a8c..100836b46 100644 --- a/llama_toolchain/telemetry/api/api.py +++ b/llama_toolchain/telemetry/api/api.py @@ -41,15 +41,9 @@ class Trace(BaseModel): @json_schema_type class EventType(Enum): UNSTRUCTURED_LOG = "unstructured_log" - - # all structured log events below - SPAN_START = "span_start" - SPAN_END = "span_end" + STRUCTURED_LOG = "structured_log" METRIC = "metric" - def is_structured(self) -> bool: - return self != EventType.UNSTRUCTURED_LOG - @json_schema_type class LogSeverity(Enum): @@ -69,25 +63,12 @@ class EventCommon(BaseModel): @json_schema_type -class LoggingEvent(EventCommon): +class UnstructuredLogEvent(EventCommon): type: Literal[EventType.UNSTRUCTURED_LOG.value] = EventType.UNSTRUCTURED_LOG.value message: str severity: LogSeverity -@json_schema_type -class SpanStartEvent(EventCommon): - type: Literal[EventType.SPAN_START.value] = EventType.SPAN_START.value - name: str - parent_span_id: Optional[str] = None - - -@json_schema_type -class SpanEndEvent(EventCommon): - type: Literal[EventType.SPAN_END.value] = EventType.SPAN_END.value - status: SpanStatus - - @json_schema_type class MetricEvent(EventCommon): type: Literal[EventType.METRIC.value] = EventType.METRIC.value @@ -96,8 +77,48 @@ class MetricEvent(EventCommon): unit: str +@json_schema_type +class StructuredLogType(Enum): + SPAN_START = "span_start" + SPAN_END = "span_end" + + +@json_schema_type +class SpanStartPayload(BaseModel): + type: Literal[StructuredLogType.SPAN_START.value] = ( + StructuredLogType.SPAN_START.value + ) + name: str + parent_span_id: Optional[str] = None + + +@json_schema_type +class SpanEndPayload(BaseModel): + type: Literal[StructuredLogType.SPAN_END.value] = StructuredLogType.SPAN_END.value + status: SpanStatus + + +StructuredLogPayload = Annotated[ + Union[ + SpanStartPayload, + SpanEndPayload, + ], + Field(discriminator="type"), +] + + +@json_schema_type +class StructuredLogEvent(EventCommon): + type: Literal[EventType.STRUCTURED_LOG.value] = EventType.STRUCTURED_LOG.value + payload: StructuredLogPayload + + Event = Annotated[ - Union[LoggingEvent, SpanStartEvent, SpanEndEvent], + Union[ + UnstructuredLogEvent, + MetricEvent, + StructuredLogEvent, + ], Field(discriminator="type"), ] diff --git a/llama_toolchain/telemetry/console/console.py b/llama_toolchain/telemetry/console/console.py index 9f3123d8d..2e7b9980d 100644 --- a/llama_toolchain/telemetry/console/console.py +++ b/llama_toolchain/telemetry/console/console.py @@ -20,18 +20,21 @@ class ConsoleTelemetryImpl(Telemetry): async def shutdown(self) -> None: ... async def log_event(self, event: Event): - if isinstance(event, SpanStartEvent): - self.spans[event.span_id] = event + if ( + isinstance(event, StructuredLogEvent) + and event.payload.type == StructuredLogType.SPAN_START.value + ): + self.spans[event.span_id] = event.payload names = [] span_id = event.span_id while True: - span_event = self.spans.get(span_id) - if not span_event: + span_payload = self.spans.get(span_id) + if not span_payload: break - names = [span_event.name] + names - span_id = span_event.parent_span_id + names = [span_payload.name] + names + span_id = span_payload.parent_span_id span_name = ".".join(names) if names else None @@ -71,7 +74,7 @@ def format_event(event: Event, span_name: str) -> Optional[str]: span = "" if span_name: span = f"{COLORS['magenta']}[{span_name}]{COLORS['reset']} " - if isinstance(event, LoggingEvent): + if isinstance(event, UnstructuredLogEvent): severity_color = SEVERITY_COLORS.get(event.severity, COLORS["reset"]) return ( f"{COLORS['dim']}{timestamp}{COLORS['reset']} " @@ -80,21 +83,7 @@ def format_event(event: Event, span_name: str) -> Optional[str]: f"{event.message}" ) - elif isinstance(event, SpanStartEvent): + elif isinstance(event, StructuredLogEvent): return None - # return (f"{COLORS['dim']}{timestamp}{COLORS['reset']} " - # f"{COLORS['blue']}[SPAN_START]{COLORS['reset']} " - # f"{span}" - # f"{COLORS['bold']}{event.name}{COLORS['reset']}") - - elif isinstance(event, SpanEndEvent): - return None - - # status_color = COLORS['green'] if event.status == SpanStatus.OK else COLORS['red'] - # return (f"{COLORS['dim']}{timestamp}{COLORS['reset']} " - # f"{COLORS['blue']}[SPAN_END]{COLORS['reset']} " - # f"{span}" - # f"{status_color}{event.status.value}{COLORS['reset']}") - return f"Unknown event type: {event}" diff --git a/llama_toolchain/telemetry/tracing.py b/llama_toolchain/telemetry/tracing.py index 4000a32f7..6afe5c2fb 100644 --- a/llama_toolchain/telemetry/tracing.py +++ b/llama_toolchain/telemetry/tracing.py @@ -43,7 +43,6 @@ class BackgroundLogger: print("Log queue is full, dropping event") def _process_logs(self): - logger = logging.getLogger() while True: try: event = self.log_queue.get() @@ -80,13 +79,15 @@ class TraceContext: ) self.logger.log_event( - SpanStartEvent( + StructuredLogEvent( trace_id=span.trace_id, span_id=span.span_id, timestamp=span.start_time, attributes=span.attributes, - name=span.name, - parent_span_id=span.parent_span_id, + payload=SpanStartPayload( + name=span.name, + parent_span_id=span.parent_span_id, + ), ) ) @@ -96,12 +97,14 @@ class TraceContext: span = self.spans.pop() if span is not None: self.logger.log_event( - SpanEndEvent( + StructuredLogEvent( trace_id=span.trace_id, span_id=span.span_id, timestamp=span.start_time, attributes=span.attributes, - status=status, + payload=SpanEndPayload( + status=status, + ), ) ) @@ -159,7 +162,7 @@ def severity(levelname: str) -> LogSeverity: # TODO: ideally, the actual emitting should be done inside a separate daemon -# (thread or process) that is responsible for flushing the events to the backend. +# process completely isolated from the server class TelemetryHandler(logging.Handler): def emit(self, record: logging.LogRecord): # horrendous hack to avoid logging from asyncio and getting into an infinite loop @@ -180,7 +183,7 @@ class TelemetryHandler(logging.Handler): return BACKGROUND_LOGGER.log_event( - LoggingEvent( + UnstructuredLogEvent( trace_id=span.trace_id, span_id=span.span_id, timestamp=datetime.now(),