opentelemetry provider working with existing traces and spans

This commit is contained in:
Dinesh Yeduguru 2024-11-22 17:46:33 -08:00
parent e9d5a7735b
commit deb11be3e3
2 changed files with 19 additions and 15 deletions

View file

@ -218,7 +218,7 @@ class TracingMiddleware:
async def __call__(self, scope, receive, send): async def __call__(self, scope, receive, send):
path = scope["path"] path = scope["path"]
await start_trace(path) await start_trace(path, {"location": "server"})
try: try:
return await self.app(scope, receive, send) return await self.app(scope, receive, send)
finally: finally:

View file

@ -103,8 +103,12 @@ class OpenTelemetryAdapter(Telemetry):
if span: if span:
timestamp_ns = int(event.timestamp.timestamp() * 1e9) timestamp_ns = int(event.timestamp.timestamp() * 1e9)
span.add_event( span.add_event(
name=event.message, name=event.type,
attributes={"severity": event.severity.value, **event.attributes}, attributes={
"message": event.message,
"severity": event.severity.value,
**event.attributes,
},
timestamp=timestamp_ns, timestamp=timestamp_ns,
) )
else: else:
@ -154,25 +158,26 @@ class OpenTelemetryAdapter(Telemetry):
return _GLOBAL_STORAGE["up_down_counters"][name] return _GLOBAL_STORAGE["up_down_counters"][name]
def _log_structured(self, event: StructuredLogEvent) -> None: def _log_structured(self, event: StructuredLogEvent) -> None:
with self._lock: with self._lock:
span_id = string_to_span_id(event.span_id) span_id = string_to_span_id(event.span_id)
trace_id = string_to_trace_id(event.trace_id)
tracer = trace.get_tracer(__name__) tracer = trace.get_tracer(__name__)
if isinstance(event.payload, SpanStartPayload): if isinstance(event.payload, SpanStartPayload):
# Find parent span from active spans # Check if span already exists to prevent duplicates
if span_id in _GLOBAL_STORAGE["active_spans"]:
return
parent_span = None parent_span = None
if event.payload.parent_span_id: if event.payload.parent_span_id:
parent_span_id = string_to_span_id(event.payload.parent_span_id) parent_span_id = string_to_span_id(event.payload.parent_span_id)
parent_span = _GLOBAL_STORAGE["active_spans"].get(parent_span_id) parent_span = _GLOBAL_STORAGE["active_spans"].get(parent_span_id)
# Create context with parent span if it exists # Create a new trace context with the trace_id
context = trace.Context() context = trace.Context(trace_id=trace_id)
if parent_span: if parent_span:
context = trace.set_span_in_context(parent_span) context = trace.set_span_in_context(parent_span, context)
# Create new span
span = tracer.start_span( span = tracer.start_span(
name=event.payload.name, name=event.payload.name,
context=context, context=context,
@ -181,12 +186,11 @@ class OpenTelemetryAdapter(Telemetry):
) )
_GLOBAL_STORAGE["active_spans"][span_id] = span _GLOBAL_STORAGE["active_spans"][span_id] = span
# Set as current span # Set as current span using context manager
_ = trace.set_span_in_context(span) with trace.use_span(span, end_on_exit=False):
trace.use_span(span, end_on_exit=False) pass # Let the span continue beyond this block
elif isinstance(event.payload, SpanEndPayload): elif isinstance(event.payload, SpanEndPayload):
# End existing span
span = _GLOBAL_STORAGE["active_spans"].get(span_id) span = _GLOBAL_STORAGE["active_spans"].get(span_id)
if span: if span:
if event.attributes: if event.attributes:
@ -201,7 +205,7 @@ class OpenTelemetryAdapter(Telemetry):
span.end(end_time=int(event.timestamp.timestamp() * 1e9)) span.end(end_time=int(event.timestamp.timestamp() * 1e9))
# Remove from active spans # Remove from active spans
del _GLOBAL_STORAGE["active_spans"][span_id] _GLOBAL_STORAGE["active_spans"].pop(span_id, None)
async def get_trace(self, trace_id: str) -> Trace: async def get_trace(self, trace_id: str) -> Trace:
raise NotImplementedError("Trace retrieval not implemented yet") raise NotImplementedError("Trace retrieval not implemented yet")