From a174938fbd8768f920df98909e341c9b4f1a6a65 Mon Sep 17 00:00:00 2001 From: Dinesh Yeduguru Date: Tue, 14 Jan 2025 11:31:50 -0800 Subject: [PATCH] Fix telemetry to work on reinstantiating new lib cli (#761) # What does this PR do? Since we maintain global state in our telemetry pipeline, reinstantiating lib cli will cause us to add duplicate span processors causing sqlite to lock out because of constraint violations since we now have two span processor writing to sqlite. This PR changes the telemetry adapter for otel to only instantiate the provider once and add the span processsors only once. Also fixes an issue llama stack build ## Test Plan tested with notebook at https://colab.research.google.com/drive/1ck7hXQxRl6UvT-ijNRZ-gMZxH1G3cN2d#scrollTo=9496f75c --- llama_stack/cli/stack/build.py | 5 +- .../telemetry/meta_reference/telemetry.py | 53 ++++++++++--------- .../providers/utils/telemetry/tracing.py | 3 +- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/llama_stack/cli/stack/build.py b/llama_stack/cli/stack/build.py index 084374c8a..85e6cb962 100644 --- a/llama_stack/cli/stack/build.py +++ b/llama_stack/cli/stack/build.py @@ -4,9 +4,7 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. import argparse - import importlib.resources - import os import shutil from functools import lru_cache @@ -14,14 +12,12 @@ from pathlib import Path from typing import List, Optional from llama_stack.cli.subcommand import Subcommand - from llama_stack.distribution.datatypes import ( BuildConfig, DistributionSpec, Provider, StackRunConfig, ) - from llama_stack.distribution.distribution import get_provider_registry from llama_stack.distribution.resolver import InvalidProviderError from llama_stack.distribution.utils.dynamic import instantiate_class_type @@ -296,6 +292,7 @@ class StackBuild(Subcommand): / f"templates/{template_name}/run.yaml" ) with importlib.resources.as_file(template_path) as path: + run_config_file = build_dir / f"{build_config.name}-run.yaml" shutil.copy(path, run_config_file) # Find all ${env.VARIABLE} patterns cprint("Build Successful!", color="green") diff --git a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py index efc37b553..332a150cf 100644 --- a/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py +++ b/llama_stack/providers/inline/telemetry/meta_reference/telemetry.py @@ -30,13 +30,10 @@ from llama_stack.apis.telemetry import ( Trace, UnstructuredLogEvent, ) - from llama_stack.distribution.datatypes import Api - from llama_stack.providers.inline.telemetry.meta_reference.console_span_processor import ( ConsoleSpanProcessor, ) - from llama_stack.providers.inline.telemetry.meta_reference.sqlite_span_processor import ( SQLiteSpanProcessor, ) @@ -52,6 +49,7 @@ _GLOBAL_STORAGE = { "up_down_counters": {}, } _global_lock = threading.Lock() +_TRACER_PROVIDER = None def string_to_trace_id(s: str) -> int: @@ -80,31 +78,34 @@ class TelemetryAdapter(TelemetryDatasetMixin, Telemetry): } ) - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - if TelemetrySink.OTEL in self.config.sinks: - otlp_exporter = OTLPSpanExporter( - endpoint=self.config.otel_endpoint, - ) - span_processor = BatchSpanProcessor(otlp_exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter( + global _TRACER_PROVIDER + if _TRACER_PROVIDER is None: + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + _TRACER_PROVIDER = provider + if TelemetrySink.OTEL in self.config.sinks: + otlp_exporter = OTLPSpanExporter( endpoint=self.config.otel_endpoint, ) - ) - metric_provider = MeterProvider( - resource=resource, metric_readers=[metric_reader] - ) - metrics.set_meter_provider(metric_provider) - self.meter = metrics.get_meter(__name__) - if TelemetrySink.SQLITE in self.config.sinks: - trace.get_tracer_provider().add_span_processor( - SQLiteSpanProcessor(self.config.sqlite_db_path) - ) - self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) - if TelemetrySink.CONSOLE in self.config.sinks: - trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor()) + span_processor = BatchSpanProcessor(otlp_exporter) + trace.get_tracer_provider().add_span_processor(span_processor) + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=self.config.otel_endpoint, + ) + ) + metric_provider = MeterProvider( + resource=resource, metric_readers=[metric_reader] + ) + metrics.set_meter_provider(metric_provider) + self.meter = metrics.get_meter(__name__) + if TelemetrySink.SQLITE in self.config.sinks: + trace.get_tracer_provider().add_span_processor( + SQLiteSpanProcessor(self.config.sqlite_db_path) + ) + self.trace_store = SQLiteTraceStore(self.config.sqlite_db_path) + if TelemetrySink.CONSOLE in self.config.sinks: + trace.get_tracer_provider().add_span_processor(ConsoleSpanProcessor()) self._lock = _global_lock async def initialize(self) -> None: diff --git a/llama_stack/providers/utils/telemetry/tracing.py b/llama_stack/providers/utils/telemetry/tracing.py index f304d58f6..d84024941 100644 --- a/llama_stack/providers/utils/telemetry/tracing.py +++ b/llama_stack/providers/utils/telemetry/tracing.py @@ -127,7 +127,8 @@ class TraceContext: def setup_logger(api: Telemetry, level: int = logging.INFO): global BACKGROUND_LOGGER - BACKGROUND_LOGGER = BackgroundLogger(api) + if BACKGROUND_LOGGER is None: + BACKGROUND_LOGGER = BackgroundLogger(api) logger = logging.getLogger() logger.setLevel(level) logger.addHandler(TelemetryHandler())