tracing for APIs

This commit is contained in:
Dinesh Yeduguru 2024-11-26 14:07:48 -08:00
parent c2a4850a79
commit af8a1fe5b3
8 changed files with 126 additions and 63 deletions

View file

@ -6,98 +6,132 @@
import asyncio
import inspect
import json
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Type, TypeVar
from pydantic import BaseModel
from llama_stack.providers.utils.telemetry import tracing
T = TypeVar("T")
def serialize_value(value: Any) -> str:
"""Helper function to serialize values to string representation."""
try:
if isinstance(value, BaseModel):
return value.model_dump_json()
elif isinstance(value, list) and value and isinstance(value[0], BaseModel):
return json.dumps([item.model_dump_json() for item in value])
elif hasattr(value, "to_dict"): # For objects with to_dict method
return json.dumps(value.to_dict())
elif isinstance(value, (dict, list, int, float, str, bool)):
return json.dumps(value)
else:
return str(value)
except Exception:
return str(value)
def traced(input: str = None):
"""
A method decorator that enables tracing with input and output capture.
Args:
input: Name of the input parameter to capture in traces
"""
def decorator(method: Callable) -> Callable:
method._trace_input = input
return method
return decorator
def trace_protocol(cls: Type[T]) -> Type[T]:
"""
A class decorator that automatically traces all methods in a protocol/base class
and its inheriting classes. Supports sync methods, async methods, and async generators.
and its inheriting classes.
"""
def trace_method(method: Callable) -> Callable:
is_async = asyncio.iscoroutinefunction(method)
is_async_gen = inspect.isasyncgenfunction(method)
def get_traced_input(args: tuple, kwargs: dict) -> dict:
trace_input = getattr(method, "_trace_input", None)
if not trace_input:
return {}
# Get the mapping of parameter names to values
sig = inspect.signature(method)
bound_args = sig.bind(None, *args, **kwargs) # None for self
bound_args.apply_defaults()
params = dict(list(bound_args.arguments.items())[1:]) # Skip 'self'
# Return the input value if the key exists
if trace_input in params:
return {"input": serialize_value(params[trace_input])}
return {}
def create_span_context(self: Any, *args: Any, **kwargs: Any) -> tuple:
class_name = self.__class__.__name__
method_name = method.__name__
span_type = (
"async_generator" if is_async_gen else "async" if is_async else "sync"
)
span_attributes = {
"class": class_name,
"method": method_name,
"type": span_type,
"args": serialize_value(args),
**get_traced_input(args, kwargs),
}
return class_name, method_name, span_attributes
@wraps(method)
async def async_gen_wrapper(
self: Any, *args: Any, **kwargs: Any
) -> AsyncGenerator:
class_name = self.__class__.__name__
method_name = f"{class_name}.{method.__name__}"
class_name, method_name, span_attributes = create_span_context(
self, *args, **kwargs
)
args_repr = [repr(arg) for arg in args]
kwargs_repr = [f"{k}={repr(v)}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
with tracing.span(
f"{class_name}.{method_name}",
{
"class": class_name,
"method": method_name,
"signature": signature,
"type": "async_generator",
},
) as span:
output = []
with tracing.span(f"{class_name}.{method_name}", span_attributes) as span:
try:
async for item in method(self, *args, **kwargs):
output.append(item)
yield item
except Exception as e:
raise
finally:
span.set_attribute("output", output)
span.set_attribute("output", "streaming output")
@wraps(method)
async def async_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
class_name = self.__class__.__name__
method_name = f"{class_name}.{method.__name__}"
class_name, method_name, span_attributes = create_span_context(
self, *args, **kwargs
)
args_repr = [repr(arg) for arg in args]
kwargs_repr = [f"{k}={repr(v)}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
with tracing.span(
f"{class_name}.{method_name}",
{
"class": class_name,
"method": method_name,
"signature": signature,
"type": "async",
},
):
with tracing.span(f"{class_name}.{method_name}", span_attributes) as span:
try:
result = await method(self, *args, **kwargs)
span.set_attribute("output", serialize_value(result))
return result
except Exception as e:
span.set_attribute("error", str(e))
raise
@wraps(method)
def sync_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
class_name = self.__class__.__name__
method_name = f"{class_name}.{method.__name__}"
class_name, method_name, span_attributes = create_span_context(
self, *args, **kwargs
)
args_repr = [repr(arg) for arg in args]
kwargs_repr = [f"{k}={repr(v)}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
with tracing.span(
f"{class_name}.{method_name}",
{
"class": class_name,
"method": method_name,
"signature": signature,
"type": "sync",
},
):
with tracing.span(f"{class_name}.{method_name}", span_attributes) as span:
try:
result = method(self, *args, **kwargs)
span.set_attribute("output", serialize_value(result))
return result
except Exception as e:
raise
@ -109,11 +143,6 @@ def trace_protocol(cls: Type[T]) -> Type[T]:
else:
return sync_wrapper
# Trace all existing methods in the base class
for name, method in vars(cls).items():
if inspect.isfunction(method) and not name.startswith("__"):
setattr(cls, name, trace_method(method))
# Store the original __init_subclass__ if it exists
original_init_subclass = getattr(cls, "__init_subclass__", None)
@ -123,10 +152,20 @@ def trace_protocol(cls: Type[T]) -> Type[T]:
if original_init_subclass:
original_init_subclass(**kwargs)
# Trace all methods defined in the child class
traced_methods = {}
for parent in cls_child.__mro__[1:]: # Skip the class itself
for name, method in vars(parent).items():
if inspect.isfunction(method) and method._trace_input:
traced_methods[name] = method._trace_input
# Trace child class methods if their name matches a traced parent method
for name, method in vars(cls_child).items():
if inspect.isfunction(method) and not name.startswith("__"):
setattr(cls_child, name, trace_method(method))
if inspect.isfunction(method) and not name.startswith("_"):
if name in traced_methods:
# Copy the trace configuration from the parent
method._trace_input = traced_methods[name]
cls_child.__dict__[name] = trace_method(method)
# Set the new __init_subclass__
cls.__init_subclass__ = classmethod(__init_subclass__)