fix: telemetry logger spams when queue is full (#3070)

# What does this PR do?


## Test Plan
Ran a stress test on chat completion endpoint locally:

For 10 concurrent users over 3 minutes:
Before:
<img width="1440" height="201" alt="image"
src="https://github.com/user-attachments/assets/24e0d580-186e-4e24-931e-2b936c5859b6"
/>

After:
<img width="1434" height="204" alt="image"
src="https://github.com/user-attachments/assets/4b806d88-f822-41e9-b25a-018cc4bec866"
/>

(Will send scripts in a future PR.)
This commit is contained in:
ehhuang 2025-08-08 13:47:36 -07:00 committed by GitHub
parent 9b70bb9d4b
commit 0b5a794c27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -9,7 +9,9 @@ import contextvars
import logging import logging
import queue import queue
import random import random
import sys
import threading import threading
import time
from collections.abc import Callable from collections.abc import Callable
from datetime import UTC, datetime from datetime import UTC, datetime
from functools import wraps from functools import wraps
@ -30,6 +32,16 @@ from llama_stack.providers.utils.telemetry.trace_protocol import serialize_value
logger = get_logger(__name__, category="core") logger = get_logger(__name__, category="core")
# Fallback logger that does NOT propagate to TelemetryHandler to avoid recursion
_fallback_logger = logging.getLogger("llama_stack.telemetry.background")
if not _fallback_logger.handlers:
_fallback_logger.propagate = False
_fallback_logger.setLevel(logging.ERROR)
_fallback_handler = logging.StreamHandler(sys.stderr)
_fallback_handler.setLevel(logging.ERROR)
_fallback_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
_fallback_logger.addHandler(_fallback_handler)
INVALID_SPAN_ID = 0x0000000000000000 INVALID_SPAN_ID = 0x0000000000000000
INVALID_TRACE_ID = 0x00000000000000000000000000000000 INVALID_TRACE_ID = 0x00000000000000000000000000000000
@ -79,19 +91,32 @@ def generate_trace_id() -> str:
CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None) CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None)
BACKGROUND_LOGGER = None BACKGROUND_LOGGER = None
LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS = 60.0
class BackgroundLogger: class BackgroundLogger:
def __init__(self, api: Telemetry, capacity: int = 100000): def __init__(self, api: Telemetry, capacity: int = 100000):
self.api = api self.api = api
self.log_queue = queue.Queue(maxsize=capacity) self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
self.worker_thread = threading.Thread(target=self._process_logs, daemon=True) self.worker_thread = threading.Thread(target=self._process_logs, daemon=True)
self.worker_thread.start() self.worker_thread.start()
self._last_queue_full_log_time: float = 0.0
self._dropped_since_last_notice: int = 0
def log_event(self, event): def log_event(self, event):
try: try:
self.log_queue.put_nowait(event) self.log_queue.put_nowait(event)
except queue.Full: except queue.Full:
logger.error("Log queue is full, dropping event") # Aggregate drops and emit at most once per interval via fallback logger
self._dropped_since_last_notice += 1
current_time = time.time()
if current_time - self._last_queue_full_log_time >= LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS:
_fallback_logger.error(
"Log queue is full; dropped %d events since last notice",
self._dropped_since_last_notice,
)
self._last_queue_full_log_time = current_time
self._dropped_since_last_notice = 0
def _process_logs(self): def _process_logs(self):
while True: while True: