From f6bf36343df7c69c9f26ae5163cbfb6491ca7247 Mon Sep 17 00:00:00 2001 From: ehhuang Date: Wed, 10 Sep 2025 11:52:23 -0700 Subject: [PATCH] chore: logging perf improvments (#3393) # What does this PR do? - Use BackgroundLogger when logging metric events. - Reuse event loop in BackgroundLogger ## Test Plan ``` cd /docs/source/distributions/k8s-benchmark # start mock server python openai-mock-server.py --port 8000 # start stack server LLAMA_STACK_LOGGING="all=WARNING" uv run --with llama-stack python -m llama_stack.core.server.server docs/source/distributions/k8s-benchmark/stack_run_config.yaml # run benchmark script uv run python3 benchmark.py --duration 120 --concurrent 50 --base-url=http://localhost:8321/v1/openai/v1 --model=vllm-inference/meta-llama/Llama-3.2-3B-Instruct ``` ### RPS from 57 -> 62 --- llama_stack/core/routers/inference.py | 14 ++++---- .../providers/utils/telemetry/tracing.py | 34 +++++++++++++------ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/llama_stack/core/routers/inference.py b/llama_stack/core/routers/inference.py index 23972deb5..9593dd5b9 100644 --- a/llama_stack/core/routers/inference.py +++ b/llama_stack/core/routers/inference.py @@ -63,7 +63,7 @@ from llama_stack.models.llama.llama3.chat_format import ChatFormat from llama_stack.models.llama.llama3.tokenizer import Tokenizer from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable from llama_stack.providers.utils.inference.inference_store import InferenceStore -from llama_stack.providers.utils.telemetry.tracing import get_current_span +from llama_stack.providers.utils.telemetry.tracing import enqueue_event, get_current_span logger = get_logger(name=__name__, category="core::routers") @@ -160,7 +160,7 @@ class InferenceRouter(Inference): metrics = self._construct_metrics(prompt_tokens, completion_tokens, total_tokens, model) if self.telemetry: for metric in metrics: - await self.telemetry.log_event(metric) + enqueue_event(metric) return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics] async def _count_tokens( @@ -431,7 +431,7 @@ class InferenceRouter(Inference): model=model_obj, ) for metric in metrics: - await self.telemetry.log_event(metric) + enqueue_event(metric) # these metrics will show up in the client response. response.metrics = ( @@ -537,7 +537,7 @@ class InferenceRouter(Inference): model=model_obj, ) for metric in metrics: - await self.telemetry.log_event(metric) + enqueue_event(metric) # these metrics will show up in the client response. response.metrics = ( metrics if not hasattr(response, "metrics") or response.metrics is None else response.metrics + metrics @@ -664,7 +664,7 @@ class InferenceRouter(Inference): "completion_tokens", "total_tokens", ]: # Only log completion and total tokens - await self.telemetry.log_event(metric) + enqueue_event(metric) # Return metrics in response async_metrics = [ @@ -710,7 +710,7 @@ class InferenceRouter(Inference): ) for metric in completion_metrics: if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens - await self.telemetry.log_event(metric) + enqueue_event(metric) # Return metrics in response return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics] @@ -806,7 +806,7 @@ class InferenceRouter(Inference): model=model, ) for metric in metrics: - await self.telemetry.log_event(metric) + enqueue_event(metric) yield chunk finally: diff --git a/llama_stack/providers/utils/telemetry/tracing.py b/llama_stack/providers/utils/telemetry/tracing.py index 7694003b5..9969b1055 100644 --- a/llama_stack/providers/utils/telemetry/tracing.py +++ b/llama_stack/providers/utils/telemetry/tracing.py @@ -18,6 +18,7 @@ from functools import wraps from typing import Any from llama_stack.apis.telemetry import ( + Event, LogSeverity, Span, SpanEndPayload, @@ -98,7 +99,7 @@ class BackgroundLogger: def __init__(self, api: Telemetry, capacity: int = 100000): self.api = api self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity) - self.worker_thread = threading.Thread(target=self._process_logs, daemon=True) + self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() self._last_queue_full_log_time: float = 0.0 self._dropped_since_last_notice: int = 0 @@ -118,12 +119,16 @@ class BackgroundLogger: self._last_queue_full_log_time = current_time self._dropped_since_last_notice = 0 - def _process_logs(self): + def _worker(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._process_logs()) + + async def _process_logs(self): while True: try: event = self.log_queue.get() - # figure out how to use a thread's native loop - asyncio.run(self.api.log_event(event)) + await self.api.log_event(event) except Exception: import traceback @@ -136,6 +141,19 @@ class BackgroundLogger: self.log_queue.join() +def enqueue_event(event: Event) -> None: + """Enqueue a telemetry event to the background logger if available. + + This provides a non-blocking path for routers and other hot paths to + submit telemetry without awaiting the Telemetry API, reducing contention + with the main event loop. + """ + global BACKGROUND_LOGGER + if BACKGROUND_LOGGER is None: + raise RuntimeError("Telemetry API not initialized") + BACKGROUND_LOGGER.log_event(event) + + class TraceContext: spans: list[Span] = [] @@ -256,11 +274,7 @@ class TelemetryHandler(logging.Handler): if record.module in ("asyncio", "selector_events"): return - global CURRENT_TRACE_CONTEXT, BACKGROUND_LOGGER - - if BACKGROUND_LOGGER is None: - raise RuntimeError("Telemetry API not initialized") - + global CURRENT_TRACE_CONTEXT context = CURRENT_TRACE_CONTEXT.get() if context is None: return @@ -269,7 +283,7 @@ class TelemetryHandler(logging.Handler): if span is None: return - BACKGROUND_LOGGER.log_event( + enqueue_event( UnstructuredLogEvent( trace_id=span.trace_id, span_id=span.span_id,