diff --git a/litellm/_service_logger.py b/litellm/_service_logger.py index fd14b3cdeb..f2edb403e9 100644 --- a/litellm/_service_logger.py +++ b/litellm/_service_logger.py @@ -91,6 +91,9 @@ class ServiceLogging(CustomLogger): duration: float, error: Union[str, Exception], call_type: str, + parent_otel_span: Optional[Span] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, ): """ - For counting if the redis, postgres call is unsuccessful @@ -119,6 +122,16 @@ class ServiceLogging(CustomLogger): payload=payload ) + from litellm.proxy.proxy_server import open_telemetry_logger + + if parent_otel_span is not None and open_telemetry_logger is not None: + await open_telemetry_logger.async_service_failure_hook( + payload=payload, + parent_otel_span=parent_otel_span, + start_time=start_time, + end_time=end_time, + ) + async def async_post_call_failure_hook( self, original_exception: Exception, user_api_key_dict: UserAPIKeyAuth ): diff --git a/litellm/caching.py b/litellm/caching.py index c1a4aa9b6a..7fc535507a 100644 --- a/litellm/caching.py +++ b/litellm/caching.py @@ -259,6 +259,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="increment_cache", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) verbose_logger.error( @@ -306,6 +309,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_scan_iter", + start_time=start_time, + end_time=end_time, + # parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) raise e @@ -319,7 +325,12 @@ class RedisCache(BaseCache): _duration = end_time - start_time asyncio.create_task( self.service_logger_obj.async_service_failure_hook( - service=ServiceTypes.REDIS, duration=_duration, error=e + service=ServiceTypes.REDIS, + duration=_duration, + error=e, + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) # NON blocking - notify users Redis is throwing an exception @@ -361,6 +372,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_set_cache", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) # NON blocking - notify users Redis is throwing an exception @@ -424,6 +438,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_set_cache_pipeline", + start_time=start_time, + end_time=end_time, + # parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) @@ -472,6 +489,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_increment", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) verbose_logger.error( @@ -581,6 +601,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_get_cache", + start_time=start_time, + end_time=end_time, + parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) # NON blocking - notify users Redis is throwing an exception @@ -639,6 +662,9 @@ class RedisCache(BaseCache): duration=_duration, error=e, call_type="async_batch_get_cache", + start_time=start_time, + end_time=end_time, + # parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs), ) ) print_verbose(f"Error occurred in pipeline read - {str(e)}") diff --git a/litellm/integrations/opentelemetry.py b/litellm/integrations/opentelemetry.py index deee0a2f26..84b2f88c65 100644 --- a/litellm/integrations/opentelemetry.py +++ b/litellm/integrations/opentelemetry.py @@ -127,6 +127,44 @@ class OpenTelemetry(CustomLogger): service_logging_span.set_status(Status(StatusCode.OK)) service_logging_span.end(end_time=_end_time_ns) + async def async_service_failure_hook( + self, + payload: ServiceLoggerPayload, + parent_otel_span: Optional[Span] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + ): + from opentelemetry import trace + from datetime import datetime + from opentelemetry.trace import Status, StatusCode + + _start_time_ns = start_time + _end_time_ns = end_time + + if isinstance(start_time, float): + _start_time_ns = int(int(start_time) * 1e9) + else: + _start_time_ns = self._to_ns(start_time) + + if isinstance(end_time, float): + _end_time_ns = int(int(end_time) * 1e9) + else: + _end_time_ns = self._to_ns(end_time) + + if parent_otel_span is not None: + _span_name = payload.service + service_logging_span = self.tracer.start_span( + name=_span_name, + context=trace.set_span_in_context(parent_otel_span), + start_time=_start_time_ns, + ) + service_logging_span.set_attribute(key="call_type", value=payload.call_type) + service_logging_span.set_attribute( + key="service", value=payload.service.value + ) + service_logging_span.set_status(Status(StatusCode.ERROR)) + service_logging_span.end(end_time=_end_time_ns) + async def async_post_call_failure_hook( self, original_exception: Exception, user_api_key_dict: UserAPIKeyAuth ):