From 0b5a794c27949eb573b6d43108aa938805d232dd Mon Sep 17 00:00:00 2001 From: ehhuang Date: Fri, 8 Aug 2025 13:47:36 -0700 Subject: [PATCH] fix: telemetry logger spams when queue is full (#3070) # What does this PR do? ## Test Plan Ran a stress test on chat completion endpoint locally: For 10 concurrent users over 3 minutes: Before: image After: image (Will send scripts in a future PR.) --- .../providers/utils/telemetry/tracing.py | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/llama_stack/providers/utils/telemetry/tracing.py b/llama_stack/providers/utils/telemetry/tracing.py index 75b29cdce..7080e774a 100644 --- a/llama_stack/providers/utils/telemetry/tracing.py +++ b/llama_stack/providers/utils/telemetry/tracing.py @@ -9,7 +9,9 @@ import contextvars import logging import queue import random +import sys import threading +import time from collections.abc import Callable from datetime import UTC, datetime from functools import wraps @@ -30,6 +32,16 @@ from llama_stack.providers.utils.telemetry.trace_protocol import serialize_value logger = get_logger(__name__, category="core") +# Fallback logger that does NOT propagate to TelemetryHandler to avoid recursion +_fallback_logger = logging.getLogger("llama_stack.telemetry.background") +if not _fallback_logger.handlers: + _fallback_logger.propagate = False + _fallback_logger.setLevel(logging.ERROR) + _fallback_handler = logging.StreamHandler(sys.stderr) + _fallback_handler.setLevel(logging.ERROR) + _fallback_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) + _fallback_logger.addHandler(_fallback_handler) + INVALID_SPAN_ID = 0x0000000000000000 INVALID_TRACE_ID = 0x00000000000000000000000000000000 @@ -79,19 +91,32 @@ def generate_trace_id() -> str: CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None) BACKGROUND_LOGGER = None +LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS = 60.0 + class BackgroundLogger: def __init__(self, api: Telemetry, capacity: int = 100000): self.api = api - self.log_queue = queue.Queue(maxsize=capacity) + self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity) self.worker_thread = threading.Thread(target=self._process_logs, daemon=True) self.worker_thread.start() + self._last_queue_full_log_time: float = 0.0 + self._dropped_since_last_notice: int = 0 def log_event(self, event): try: self.log_queue.put_nowait(event) except queue.Full: - logger.error("Log queue is full, dropping event") + # Aggregate drops and emit at most once per interval via fallback logger + self._dropped_since_last_notice += 1 + current_time = time.time() + if current_time - self._last_queue_full_log_time >= LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS: + _fallback_logger.error( + "Log queue is full; dropped %d events since last notice", + self._dropped_since_last_notice, + ) + self._last_queue_full_log_time = current_time + self._dropped_since_last_notice = 0 def _process_logs(self): while True: