only initialize otel span processors once

This commit is contained in:
Dinesh Yeduguru 2025-01-14 10:51:42 -08:00
parent f8d8309b0e
commit 51b4ad77e1

View file

@ -30,13 +30,10 @@ from llama_stack.apis.telemetry import (
Trace, Trace,
UnstructuredLogEvent, UnstructuredLogEvent,
) )
from llama_stack.distribution.datatypes import Api from llama_stack.distribution.datatypes import Api
from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import ( from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import (
ConsoleSpanProcessor, ConsoleSpanProcessor,
) )
from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import (
SQLiteSpanProcessor, SQLiteSpanProcessor,
) )
@ -52,6 +49,7 @@ _GLOBAL_STORAGE = {
"up_down_counters": {}, "up_down_counters": {},
} }
_global_lock = threading.Lock() _global_lock = threading.Lock()
_TRACER_PROVIDER = None
def string_to_trace_id(s: str) -> int: def string_to_trace_id(s: str) -> int:
@ -80,31 +78,34 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry):
} }
) )
provider = TracerProvider(resource=resource) global _TRACER_PROVIDER
trace.set_tracer_provider(provider) if _TRACER_PROVIDER is None:
if TelemetrySink.OTEL in self.config.sinks: provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter( trace.set_tracer_provider(provider)
endpoint=self.config.otel_endpoint, _TRACER_PROVIDER = provider
) if TelemetrySink.OTEL in self.config.sinks:
span_processor = BatchSpanProcessor(otlp_exporter) otlp_exporter = OTLPSpanExporter(
trace.get_tracer_provider().add_span_processor(span_processor)
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=self.config.otel_endpoint, endpoint=self.config.otel_endpoint,
) )
) span_processor = BatchSpanProcessor(otlp_exporter)
metric_provider = MeterProvider( trace.get_tracer_provider().add_span_processor(span_processor)
resource=resource, metric_readers=[metric_reader] metric_reader = PeriodicExportingMetricReader(
) OTLPMetricExporter(
metrics.set_meter_provider(metric_provider) endpoint=self.config.otel_endpoint,
self.meter = metrics.get_meter(__name__) )
if TelemetrySink.SQLITE in self.config.sinks: )
trace.get_tracer_provider().add_span_processor( metric_provider = MeterProvider(
SQLiteSpanProcessor(self.config.sqlite_db_path) resource=resource, metric_readers=[metric_reader]
) )
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) metrics.set_meter_provider(metric_provider)
if TelemetrySink.CONSOLE in self.config.sinks: self.meter = metrics.get_meter(__name__)
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor()) if TelemetrySink.SQLITE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(
SQLiteSpanProcessor(self.config.sqlite_db_path)
)
self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path)
if TelemetrySink.CONSOLE in self.config.sinks:
trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor())
self._lock = _global_lock self._lock = _global_lock
async def initialize(self) -> None: async def initialize(self) -> None: