mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 04:04:14 +00:00
chore: logging perf improvments
# What does this PR do? ## Test Plan # What does this PR do? ## Test Plan
This commit is contained in:
parent
935b8e28de
commit
8928ecfca0
2 changed files with 31 additions and 17 deletions
|
@ -63,7 +63,7 @@ from llama_stack.models.llama.llama3.chat_format import ChatFormat
|
||||||
from llama_stack.models.llama.llama3.tokenizer import Tokenizer
|
from llama_stack.models.llama.llama3.tokenizer import Tokenizer
|
||||||
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable
|
from llama_stack.providers.datatypes import HealthResponse, HealthStatus, RoutingTable
|
||||||
from llama_stack.providers.utils.inference.inference_store import InferenceStore
|
from llama_stack.providers.utils.inference.inference_store import InferenceStore
|
||||||
from llama_stack.providers.utils.telemetry.tracing import get_current_span
|
from llama_stack.providers.utils.telemetry.tracing import enqueue_event, get_current_span
|
||||||
|
|
||||||
logger = get_logger(name=__name__, category="core::routers")
|
logger = get_logger(name=__name__, category="core::routers")
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ class InferenceRouter(Inference):
|
||||||
metrics = self._construct_metrics(prompt_tokens, completion_tokens, total_tokens, model)
|
metrics = self._construct_metrics(prompt_tokens, completion_tokens, total_tokens, model)
|
||||||
if self.telemetry:
|
if self.telemetry:
|
||||||
for metric in metrics:
|
for metric in metrics:
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics]
|
return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in metrics]
|
||||||
|
|
||||||
async def _count_tokens(
|
async def _count_tokens(
|
||||||
|
@ -431,7 +431,7 @@ class InferenceRouter(Inference):
|
||||||
model=model_obj,
|
model=model_obj,
|
||||||
)
|
)
|
||||||
for metric in metrics:
|
for metric in metrics:
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
|
|
||||||
# these metrics will show up in the client response.
|
# these metrics will show up in the client response.
|
||||||
response.metrics = (
|
response.metrics = (
|
||||||
|
@ -537,7 +537,7 @@ class InferenceRouter(Inference):
|
||||||
model=model_obj,
|
model=model_obj,
|
||||||
)
|
)
|
||||||
for metric in metrics:
|
for metric in metrics:
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
# these metrics will show up in the client response.
|
# these metrics will show up in the client response.
|
||||||
response.metrics = (
|
response.metrics = (
|
||||||
metrics if not hasattr(response, "metrics") or response.metrics is None else response.metrics + metrics
|
metrics if not hasattr(response, "metrics") or response.metrics is None else response.metrics + metrics
|
||||||
|
@ -664,7 +664,7 @@ class InferenceRouter(Inference):
|
||||||
"completion_tokens",
|
"completion_tokens",
|
||||||
"total_tokens",
|
"total_tokens",
|
||||||
]: # Only log completion and total tokens
|
]: # Only log completion and total tokens
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
|
|
||||||
# Return metrics in response
|
# Return metrics in response
|
||||||
async_metrics = [
|
async_metrics = [
|
||||||
|
@ -710,7 +710,7 @@ class InferenceRouter(Inference):
|
||||||
)
|
)
|
||||||
for metric in completion_metrics:
|
for metric in completion_metrics:
|
||||||
if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens
|
if metric.metric in ["completion_tokens", "total_tokens"]: # Only log completion and total tokens
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
|
|
||||||
# Return metrics in response
|
# Return metrics in response
|
||||||
return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics]
|
return [MetricInResponse(metric=metric.metric, value=metric.value) for metric in completion_metrics]
|
||||||
|
@ -806,7 +806,7 @@ class InferenceRouter(Inference):
|
||||||
model=model,
|
model=model,
|
||||||
)
|
)
|
||||||
for metric in metrics:
|
for metric in metrics:
|
||||||
await self.telemetry.log_event(metric)
|
enqueue_event(metric)
|
||||||
|
|
||||||
yield chunk
|
yield chunk
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -18,6 +18,7 @@ from functools import wraps
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from llama_stack.apis.telemetry import (
|
from llama_stack.apis.telemetry import (
|
||||||
|
Event,
|
||||||
LogSeverity,
|
LogSeverity,
|
||||||
Span,
|
Span,
|
||||||
SpanEndPayload,
|
SpanEndPayload,
|
||||||
|
@ -98,7 +99,7 @@ class BackgroundLogger:
|
||||||
def __init__(self, api: Telemetry, capacity: int = 100000):
|
def __init__(self, api: Telemetry, capacity: int = 100000):
|
||||||
self.api = api
|
self.api = api
|
||||||
self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
|
self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
|
||||||
self.worker_thread = threading.Thread(target=self._process_logs, daemon=True)
|
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
||||||
self.worker_thread.start()
|
self.worker_thread.start()
|
||||||
self._last_queue_full_log_time: float = 0.0
|
self._last_queue_full_log_time: float = 0.0
|
||||||
self._dropped_since_last_notice: int = 0
|
self._dropped_since_last_notice: int = 0
|
||||||
|
@ -118,12 +119,16 @@ class BackgroundLogger:
|
||||||
self._last_queue_full_log_time = current_time
|
self._last_queue_full_log_time = current_time
|
||||||
self._dropped_since_last_notice = 0
|
self._dropped_since_last_notice = 0
|
||||||
|
|
||||||
def _process_logs(self):
|
def _worker(self):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
loop.run_until_complete(self._process_logs())
|
||||||
|
|
||||||
|
async def _process_logs(self):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
event = self.log_queue.get()
|
event = self.log_queue.get()
|
||||||
# figure out how to use a thread's native loop
|
await self.api.log_event(event)
|
||||||
asyncio.run(self.api.log_event(event))
|
|
||||||
except Exception:
|
except Exception:
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
@ -136,6 +141,19 @@ class BackgroundLogger:
|
||||||
self.log_queue.join()
|
self.log_queue.join()
|
||||||
|
|
||||||
|
|
||||||
|
def enqueue_event(event: Event) -> None:
|
||||||
|
"""Enqueue a telemetry event to the background logger if available.
|
||||||
|
|
||||||
|
This provides a non-blocking path for routers and other hot paths to
|
||||||
|
submit telemetry without awaiting the Telemetry API, reducing contention
|
||||||
|
with the main event loop.
|
||||||
|
"""
|
||||||
|
global BACKGROUND_LOGGER
|
||||||
|
if BACKGROUND_LOGGER is None:
|
||||||
|
raise RuntimeError("Telemetry API not initialized")
|
||||||
|
BACKGROUND_LOGGER.log_event(event)
|
||||||
|
|
||||||
|
|
||||||
class TraceContext:
|
class TraceContext:
|
||||||
spans: list[Span] = []
|
spans: list[Span] = []
|
||||||
|
|
||||||
|
@ -256,11 +274,7 @@ class TelemetryHandler(logging.Handler):
|
||||||
if record.module in ("asyncio", "selector_events"):
|
if record.module in ("asyncio", "selector_events"):
|
||||||
return
|
return
|
||||||
|
|
||||||
global CURRENT_TRACE_CONTEXT, BACKGROUND_LOGGER
|
global CURRENT_TRACE_CONTEXT
|
||||||
|
|
||||||
if BACKGROUND_LOGGER is None:
|
|
||||||
raise RuntimeError("Telemetry API not initialized")
|
|
||||||
|
|
||||||
context = CURRENT_TRACE_CONTEXT.get()
|
context = CURRENT_TRACE_CONTEXT.get()
|
||||||
if context is None:
|
if context is None:
|
||||||
return
|
return
|
||||||
|
@ -269,7 +283,7 @@ class TelemetryHandler(logging.Handler):
|
||||||
if span is None:
|
if span is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
BACKGROUND_LOGGER.log_event(
|
enqueue_event(
|
||||||
UnstructuredLogEvent(
|
UnstructuredLogEvent(
|
||||||
trace_id=span.trace_id,
|
trace_id=span.trace_id,
|
||||||
span_id=span.span_id,
|
span_id=span.span_id,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue