diff --git a/llama_stack/distribution/server/server.py b/llama_stack/distribution/server/server.py index ff57b0e1e..5e372d183 100644 --- a/llama_stack/distribution/server/server.py +++ b/llama_stack/distribution/server/server.py @@ -218,7 +218,7 @@ class TracingMiddleware: async def __call__(self, scope, receive, send): path = scope["path"] - await start_trace(path) + await start_trace(path, {"location": "server"}) try: return await self.app(scope, receive, send) finally: diff --git a/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py b/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py index 01edd1692..a7b91991d 100644 --- a/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py +++ b/llama_stack/providers/remote/telemetry/opentelemetry/opentelemetry.py @@ -103,8 +103,12 @@ class OpenTelemetryAdapter(Telemetry): if span: timestamp_ns = int(event.timestamp.timestamp() * 1e9) span.add_event( - name=event.message, - attributes={"severity": event.severity.value, **event.attributes}, + name=event.type, + attributes={ + "message": event.message, + "severity": event.severity.value, + **event.attributes, + }, timestamp=timestamp_ns, ) else: @@ -154,25 +158,26 @@ class OpenTelemetryAdapter(Telemetry): return _GLOBAL_STORAGE["up_down_counters"][name] def _log_structured(self, event: StructuredLogEvent) -> None: - with self._lock: span_id = string_to_span_id(event.span_id) - + trace_id = string_to_trace_id(event.trace_id) tracer = trace.get_tracer(__name__) if isinstance(event.payload, SpanStartPayload): - # Find parent span from active spans + # Check if span already exists to prevent duplicates + if span_id in _GLOBAL_STORAGE["active_spans"]: + return + parent_span = None if event.payload.parent_span_id: parent_span_id = string_to_span_id(event.payload.parent_span_id) parent_span = _GLOBAL_STORAGE["active_spans"].get(parent_span_id) - # Create context with parent span if it exists - context = trace.Context() + # Create a new trace context with the trace_id + context = trace.Context(trace_id=trace_id) if parent_span: - context = trace.set_span_in_context(parent_span) + context = trace.set_span_in_context(parent_span, context) - # Create new span span = tracer.start_span( name=event.payload.name, context=context, @@ -181,12 +186,11 @@ class OpenTelemetryAdapter(Telemetry): ) _GLOBAL_STORAGE["active_spans"][span_id] = span - # Set as current span - _ = trace.set_span_in_context(span) - trace.use_span(span, end_on_exit=False) + # Set as current span using context manager + with trace.use_span(span, end_on_exit=False): + pass # Let the span continue beyond this block elif isinstance(event.payload, SpanEndPayload): - # End existing span span = _GLOBAL_STORAGE["active_spans"].get(span_id) if span: if event.attributes: @@ -201,7 +205,7 @@ class OpenTelemetryAdapter(Telemetry): span.end(end_time=int(event.timestamp.timestamp() * 1e9)) # Remove from active spans - del _GLOBAL_STORAGE["active_spans"][span_id] + _GLOBAL_STORAGE["active_spans"].pop(span_id, None) async def get_trace(self, trace_id: str) -> Trace: raise NotImplementedError("Trace retrieval not implemented yet")