mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-02 08:44:44 +00:00
use global state in open telemetry provider
This commit is contained in:
parent
d2e6e59647
commit
e9d5a7735b
1 changed files with 38 additions and 40 deletions
|
@ -23,6 +23,15 @@ from llama_stack.apis.telemetry import * # noqa: F403
|
||||||
|
|
||||||
from .config import OpenTelemetryConfig
|
from .config import OpenTelemetryConfig
|
||||||
|
|
||||||
|
# Add global storage
|
||||||
|
_GLOBAL_STORAGE = {
|
||||||
|
"active_spans": {},
|
||||||
|
"counters": {},
|
||||||
|
"gauges": {},
|
||||||
|
"up_down_counters": {},
|
||||||
|
}
|
||||||
|
_global_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def string_to_trace_id(s: str) -> int:
|
def string_to_trace_id(s: str) -> int:
|
||||||
# Convert the string to bytes and then to an integer
|
# Convert the string to bytes and then to an integer
|
||||||
|
@ -67,12 +76,7 @@ class OpenTelemetryAdapter(Telemetry):
|
||||||
)
|
)
|
||||||
metrics.set_meter_provider(metric_provider)
|
metrics.set_meter_provider(metric_provider)
|
||||||
self.meter = metrics.get_meter(__name__)
|
self.meter = metrics.get_meter(__name__)
|
||||||
# Initialize metric storage
|
self._lock = _global_lock
|
||||||
self._counters = {}
|
|
||||||
self._gauges = {}
|
|
||||||
self._up_down_counters = {}
|
|
||||||
self._active_spans = {}
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
async def initialize(self) -> None:
|
async def initialize(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
@ -92,12 +96,11 @@ class OpenTelemetryAdapter(Telemetry):
|
||||||
|
|
||||||
def _log_unstructured(self, event: UnstructuredLogEvent) -> None:
|
def _log_unstructured(self, event: UnstructuredLogEvent) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# Check if there's an existing span in the cache
|
# Use global storage instead of instance storage
|
||||||
span_id = string_to_span_id(event.span_id)
|
span_id = string_to_span_id(event.span_id)
|
||||||
span = self._active_spans.get(span_id)
|
span = _GLOBAL_STORAGE["active_spans"].get(span_id)
|
||||||
|
|
||||||
if span:
|
if span:
|
||||||
# Use existing span
|
|
||||||
timestamp_ns = int(event.timestamp.timestamp() * 1e9)
|
timestamp_ns = int(event.timestamp.timestamp() * 1e9)
|
||||||
span.add_event(
|
span.add_event(
|
||||||
name=event.message,
|
name=event.message,
|
||||||
|
@ -110,22 +113,22 @@ class OpenTelemetryAdapter(Telemetry):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_or_create_counter(self, name: str, unit: str) -> metrics.Counter:
|
def _get_or_create_counter(self, name: str, unit: str) -> metrics.Counter:
|
||||||
if name not in self._counters:
|
if name not in _GLOBAL_STORAGE["counters"]:
|
||||||
self._counters[name] = self.meter.create_counter(
|
_GLOBAL_STORAGE["counters"][name] = self.meter.create_counter(
|
||||||
name=name,
|
name=name,
|
||||||
unit=unit,
|
unit=unit,
|
||||||
description=f"Counter for {name}",
|
description=f"Counter for {name}",
|
||||||
)
|
)
|
||||||
return self._counters[name]
|
return _GLOBAL_STORAGE["counters"][name]
|
||||||
|
|
||||||
def _get_or_create_gauge(self, name: str, unit: str) -> metrics.ObservableGauge:
|
def _get_or_create_gauge(self, name: str, unit: str) -> metrics.ObservableGauge:
|
||||||
if name not in self._gauges:
|
if name not in _GLOBAL_STORAGE["gauges"]:
|
||||||
self._gauges[name] = self.meter.create_gauge(
|
_GLOBAL_STORAGE["gauges"][name] = self.meter.create_gauge(
|
||||||
name=name,
|
name=name,
|
||||||
unit=unit,
|
unit=unit,
|
||||||
description=f"Gauge for {name}",
|
description=f"Gauge for {name}",
|
||||||
)
|
)
|
||||||
return self._gauges[name]
|
return _GLOBAL_STORAGE["gauges"][name]
|
||||||
|
|
||||||
def _log_metric(self, event: MetricEvent) -> None:
|
def _log_metric(self, event: MetricEvent) -> None:
|
||||||
if isinstance(event.value, int):
|
if isinstance(event.value, int):
|
||||||
|
@ -140,56 +143,51 @@ class OpenTelemetryAdapter(Telemetry):
|
||||||
def _get_or_create_up_down_counter(
|
def _get_or_create_up_down_counter(
|
||||||
self, name: str, unit: str
|
self, name: str, unit: str
|
||||||
) -> metrics.UpDownCounter:
|
) -> metrics.UpDownCounter:
|
||||||
if name not in self._up_down_counters:
|
if name not in _GLOBAL_STORAGE["up_down_counters"]:
|
||||||
self._up_down_counters[name] = self.meter.create_up_down_counter(
|
_GLOBAL_STORAGE["up_down_counters"][name] = (
|
||||||
|
self.meter.create_up_down_counter(
|
||||||
name=name,
|
name=name,
|
||||||
unit=unit,
|
unit=unit,
|
||||||
description=f"UpDownCounter for {name}",
|
description=f"UpDownCounter for {name}",
|
||||||
)
|
)
|
||||||
return self._up_down_counters[name]
|
)
|
||||||
|
return _GLOBAL_STORAGE["up_down_counters"][name]
|
||||||
|
|
||||||
def _log_structured(self, event: StructuredLogEvent) -> None:
|
def _log_structured(self, event: StructuredLogEvent) -> None:
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
trace_id = string_to_trace_id(event.trace_id)
|
|
||||||
span_id = string_to_span_id(event.span_id)
|
span_id = string_to_span_id(event.span_id)
|
||||||
|
|
||||||
tracer = trace.get_tracer(__name__)
|
tracer = trace.get_tracer(__name__)
|
||||||
span_context = trace.SpanContext(
|
|
||||||
trace_id=trace_id,
|
|
||||||
span_id=span_id,
|
|
||||||
is_remote=True,
|
|
||||||
trace_flags=trace.TraceFlags(trace.TraceFlags.SAMPLED),
|
|
||||||
trace_state=trace.TraceState(),
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(event.payload, SpanStartPayload):
|
if isinstance(event.payload, SpanStartPayload):
|
||||||
# Get parent span if it exists
|
# Find parent span from active spans
|
||||||
parent_span = None
|
parent_span = None
|
||||||
for active_span in self._active_spans.values():
|
if event.payload.parent_span_id:
|
||||||
if active_span.is_recording():
|
parent_span_id = string_to_span_id(event.payload.parent_span_id)
|
||||||
parent_span = active_span
|
parent_span = _GLOBAL_STORAGE["active_spans"].get(parent_span_id)
|
||||||
break
|
|
||||||
|
|
||||||
# Create the context properly
|
# Create context with parent span if it exists
|
||||||
context = trace.Context()
|
context = trace.Context()
|
||||||
if parent_span:
|
if parent_span:
|
||||||
context = trace.set_span_in_context(parent_span)
|
context = trace.set_span_in_context(parent_span)
|
||||||
|
|
||||||
|
# Create new span
|
||||||
span = tracer.start_span(
|
span = tracer.start_span(
|
||||||
name=event.payload.name,
|
name=event.payload.name,
|
||||||
context=context,
|
context=context,
|
||||||
attributes=event.attributes or {},
|
attributes=event.attributes or {},
|
||||||
start_time=int(event.timestamp.timestamp() * 1e9),
|
start_time=int(event.timestamp.timestamp() * 1e9),
|
||||||
)
|
)
|
||||||
self._active_spans[span_id] = span
|
_GLOBAL_STORAGE["active_spans"][span_id] = span
|
||||||
|
|
||||||
# Set the span as current
|
# Set as current span
|
||||||
_ = trace.set_span_in_context(span)
|
_ = trace.set_span_in_context(span)
|
||||||
trace.use_span(span, end_on_exit=False)
|
trace.use_span(span, end_on_exit=False)
|
||||||
|
|
||||||
elif isinstance(event.payload, SpanEndPayload):
|
elif isinstance(event.payload, SpanEndPayload):
|
||||||
# Retrieve and end the existing span
|
# End existing span
|
||||||
span = self._active_spans.get(span_id)
|
span = _GLOBAL_STORAGE["active_spans"].get(span_id)
|
||||||
if span:
|
if span:
|
||||||
if event.attributes:
|
if event.attributes:
|
||||||
span.set_attributes(event.attributes)
|
span.set_attributes(event.attributes)
|
||||||
|
@ -203,7 +201,7 @@ class OpenTelemetryAdapter(Telemetry):
|
||||||
span.end(end_time=int(event.timestamp.timestamp() * 1e9))
|
span.end(end_time=int(event.timestamp.timestamp() * 1e9))
|
||||||
|
|
||||||
# Remove from active spans
|
# Remove from active spans
|
||||||
del self._active_spans[span_id]
|
del _GLOBAL_STORAGE["active_spans"][span_id]
|
||||||
|
|
||||||
async def get_trace(self, trace_id: str) -> Trace:
|
async def get_trace(self, trace_id: str) -> Trace:
|
||||||
raise NotImplementedError("Trace retrieval not implemented yet")
|
raise NotImplementedError("Trace retrieval not implemented yet")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue