move span events one level down into structured log events

This commit is contained in:
Ashwin Bharambe 2024-09-11 14:24:54 -07:00
parent 99af14b18c
commit e8c2f068a3
3 changed files with 65 additions and 52 deletions

View file

@ -41,15 +41,9 @@ class Trace(BaseModel):
@json_schema_type
class EventType(Enum):
UNSTRUCTURED_LOG = "unstructured_log"
# all structured log events below
SPAN_START = "span_start"
SPAN_END = "span_end"
STRUCTURED_LOG = "structured_log"
METRIC = "metric"
def is_structured(self) -> bool:
return self != EventType.UNSTRUCTURED_LOG
@json_schema_type
class LogSeverity(Enum):
@ -69,25 +63,12 @@ class EventCommon(BaseModel):
@json_schema_type
class LoggingEvent(EventCommon):
class UnstructuredLogEvent(EventCommon):
type: Literal[EventType.UNSTRUCTURED_LOG.value] = EventType.UNSTRUCTURED_LOG.value
message: str
severity: LogSeverity
@json_schema_type
class SpanStartEvent(EventCommon):
type: Literal[EventType.SPAN_START.value] = EventType.SPAN_START.value
name: str
parent_span_id: Optional[str] = None
@json_schema_type
class SpanEndEvent(EventCommon):
type: Literal[EventType.SPAN_END.value] = EventType.SPAN_END.value
status: SpanStatus
@json_schema_type
class MetricEvent(EventCommon):
type: Literal[EventType.METRIC.value] = EventType.METRIC.value
@ -96,8 +77,48 @@ class MetricEvent(EventCommon):
unit: str
@json_schema_type
class StructuredLogType(Enum):
SPAN_START = "span_start"
SPAN_END = "span_end"
@json_schema_type
class SpanStartPayload(BaseModel):
type: Literal[StructuredLogType.SPAN_START.value] = (
StructuredLogType.SPAN_START.value
)
name: str
parent_span_id: Optional[str] = None
@json_schema_type
class SpanEndPayload(BaseModel):
type: Literal[StructuredLogType.SPAN_END.value] = StructuredLogType.SPAN_END.value
status: SpanStatus
StructuredLogPayload = Annotated[
Union[
SpanStartPayload,
SpanEndPayload,
],
Field(discriminator="type"),
]
@json_schema_type
class StructuredLogEvent(EventCommon):
type: Literal[EventType.STRUCTURED_LOG.value] = EventType.STRUCTURED_LOG.value
payload: StructuredLogPayload
Event = Annotated[
Union[LoggingEvent, SpanStartEvent, SpanEndEvent],
Union[
UnstructuredLogEvent,
MetricEvent,
StructuredLogEvent,
],
Field(discriminator="type"),
]

View file

@ -20,18 +20,21 @@ class ConsoleTelemetryImpl(Telemetry):
async def shutdown(self) -> None: ...
async def log_event(self, event: Event):
if isinstance(event, SpanStartEvent):
self.spans[event.span_id] = event
if (
isinstance(event, StructuredLogEvent)
and event.payload.type == StructuredLogType.SPAN_START.value
):
self.spans[event.span_id] = event.payload
names = []
span_id = event.span_id
while True:
span_event = self.spans.get(span_id)
if not span_event:
span_payload = self.spans.get(span_id)
if not span_payload:
break
names = [span_event.name] + names
span_id = span_event.parent_span_id
names = [span_payload.name] + names
span_id = span_payload.parent_span_id
span_name = ".".join(names) if names else None
@ -71,7 +74,7 @@ def format_event(event: Event, span_name: str) -> Optional[str]:
span = ""
if span_name:
span = f"{COLORS['magenta']}[{span_name}]{COLORS['reset']} "
if isinstance(event, LoggingEvent):
if isinstance(event, UnstructuredLogEvent):
severity_color = SEVERITY_COLORS.get(event.severity, COLORS["reset"])
return (
f"{COLORS['dim']}{timestamp}{COLORS['reset']} "
@ -80,21 +83,7 @@ def format_event(event: Event, span_name: str) -> Optional[str]:
f"{event.message}"
)
elif isinstance(event, SpanStartEvent):
elif isinstance(event, StructuredLogEvent):
return None
# return (f"{COLORS['dim']}{timestamp}{COLORS['reset']} "
# f"{COLORS['blue']}[SPAN_START]{COLORS['reset']} "
# f"{span}"
# f"{COLORS['bold']}{event.name}{COLORS['reset']}")
elif isinstance(event, SpanEndEvent):
return None
# status_color = COLORS['green'] if event.status == SpanStatus.OK else COLORS['red']
# return (f"{COLORS['dim']}{timestamp}{COLORS['reset']} "
# f"{COLORS['blue']}[SPAN_END]{COLORS['reset']} "
# f"{span}"
# f"{status_color}{event.status.value}{COLORS['reset']}")
return f"Unknown event type: {event}"

View file

@ -43,7 +43,6 @@ class BackgroundLogger:
print("Log queue is full, dropping event")
def _process_logs(self):
logger = logging.getLogger()
while True:
try:
event = self.log_queue.get()
@ -80,13 +79,15 @@ class TraceContext:
)
self.logger.log_event(
SpanStartEvent(
StructuredLogEvent(
trace_id=span.trace_id,
span_id=span.span_id,
timestamp=span.start_time,
attributes=span.attributes,
name=span.name,
parent_span_id=span.parent_span_id,
payload=SpanStartPayload(
name=span.name,
parent_span_id=span.parent_span_id,
),
)
)
@ -96,12 +97,14 @@ class TraceContext:
span = self.spans.pop()
if span is not None:
self.logger.log_event(
SpanEndEvent(
StructuredLogEvent(
trace_id=span.trace_id,
span_id=span.span_id,
timestamp=span.start_time,
attributes=span.attributes,
status=status,
payload=SpanEndPayload(
status=status,
),
)
)
@ -159,7 +162,7 @@ def severity(levelname: str) -> LogSeverity:
# TODO: ideally, the actual emitting should be done inside a separate daemon
# (thread or process) that is responsible for flushing the events to the backend.
# process completely isolated from the server
class TelemetryHandler(logging.Handler):
def emit(self, record: logging.LogRecord):
# horrendous hack to avoid logging from asyncio and getting into an infinite loop
@ -180,7 +183,7 @@ class TelemetryHandler(logging.Handler):
return
BACKGROUND_LOGGER.log_event(
LoggingEvent(
UnstructuredLogEvent(
trace_id=span.trace_id,
span_id=span.span_id,
timestamp=datetime.now(),