# What does this PR do?


## Test Plan
This commit is contained in:
Eric Huang 2025-08-07 14:06:48 -07:00
parent 5f1ddd35e4
commit d7a39b51b9

View file

@ -9,7 +9,9 @@ import contextvars
import logging import logging
import queue import queue
import random import random
import sys
import threading import threading
import time
from collections.abc import Callable from collections.abc import Callable
from datetime import UTC, datetime from datetime import UTC, datetime
from functools import wraps from functools import wraps
@ -30,6 +32,16 @@ from llama_stack.providers.utils.telemetry.trace_protocol import serialize_value
logger = get_logger(__name__, category="core") logger = get_logger(__name__, category="core")
# Fallback logger that does NOT propagate to TelemetryHandler to avoid recursion
_fallback_logger = logging.getLogger("llama_stack.telemetry.background")
if not _fallback_logger.handlers:
_fallback_logger.propagate = False
_fallback_logger.setLevel(logging.ERROR)
_fallback_handler = logging.StreamHandler(sys.stderr)
_fallback_handler.setLevel(logging.ERROR)
_fallback_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
_fallback_logger.addHandler(_fallback_handler)
INVALID_SPAN_ID = 0x0000000000000000 INVALID_SPAN_ID = 0x0000000000000000
INVALID_TRACE_ID = 0x00000000000000000000000000000000 INVALID_TRACE_ID = 0x00000000000000000000000000000000
@ -79,19 +91,32 @@ def generate_trace_id() -> str:
CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None) CURRENT_TRACE_CONTEXT = contextvars.ContextVar("trace_context", default=None)
BACKGROUND_LOGGER = None BACKGROUND_LOGGER = None
LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS = 60.0
class BackgroundLogger: class BackgroundLogger:
def __init__(self, api: Telemetry, capacity: int = 100000): def __init__(self, api: Telemetry, capacity: int = 100000):
self.api = api self.api = api
self.log_queue = queue.Queue(maxsize=capacity) self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
self.worker_thread = threading.Thread(target=self._process_logs, daemon=True) self.worker_thread = threading.Thread(target=self._process_logs, daemon=True)
self.worker_thread.start() self.worker_thread.start()
self._last_queue_full_log_time: float = 0.0
self._dropped_since_last_notice: int = 0
def log_event(self, event): def log_event(self, event):
try: try:
self.log_queue.put_nowait(event) self.log_queue.put_nowait(event)
except queue.Full: except queue.Full:
logger.error("Log queue is full, dropping event") # Aggregate drops and emit at most once per interval via fallback logger
self._dropped_since_last_notice += 1
current_time = time.time()
if current_time - self._last_queue_full_log_time >= LOG_QUEUE_FULL_LOG_INTERVAL_SECONDS:
_fallback_logger.error(
"Log queue is full; dropped %d events since last notice",
self._dropped_since_last_notice,
)
self._last_queue_full_log_time = current_time
self._dropped_since_last_notice = 0
def _process_logs(self): def _process_logs(self):
while True: while True: