refactor(litellm_logging.py): refactors how slack_alerting generates langfuse trace url

gets the url from logging object
This commit is contained in:
Krrish Dholakia 2024-06-21 16:12:25 -07:00
parent 941574a921
commit 2584120012
5 changed files with 207 additions and 41 deletions

View file

@ -10,7 +10,7 @@ import sys
import time
import traceback
import uuid
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Literal, Optional
import litellm
from litellm import (
@ -19,7 +19,7 @@ from litellm import (
turn_off_message_logging,
verbose_logger,
)
from litellm.caching import S3Cache
from litellm.caching import InMemoryCache, S3Cache
from litellm.integrations.custom_logger import CustomLogger
from litellm.litellm_core_utils.redact_messages import (
redact_message_input_output_from_logging,
@ -111,7 +111,25 @@ additional_details: Optional[Dict[str, str]] = {}
local_cache: Optional[Dict[str, str]] = {}
last_fetched_at = None
last_fetched_at_keys = None
####
class ServiceTraceIDCache:
def __init__(self) -> None:
self.cache = InMemoryCache()
def get_cache(self, litellm_call_id: str, service_name: str) -> Optional[str]:
key_name = "{}:{}".format(service_name, litellm_call_id)
response = self.cache.get_cache(key=key_name)
return response
def set_cache(self, litellm_call_id: str, service_name: str, trace_id: str) -> None:
key_name = "{}:{}".format(service_name, litellm_call_id)
self.cache.set_cache(key=key_name, value=trace_id)
return None
in_memory_trace_id_cache = ServiceTraceIDCache()
class Logging:
@ -821,7 +839,7 @@ class Logging:
langfuse_secret=self.langfuse_secret,
langfuse_host=self.langfuse_host,
)
langFuseLogger.log_event(
_response = langFuseLogger.log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
@ -829,6 +847,14 @@ class Logging:
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
if _response is not None and isinstance(_response, dict):
_trace_id = _response.get("trace_id", None)
if _trace_id is not None:
in_memory_trace_id_cache.set_cache(
litellm_call_id=self.litellm_call_id,
service_name="langfuse",
trace_id=_trace_id,
)
if callback == "datadog":
global dataDogLogger
verbose_logger.debug("reaches datadog for success logging!")
@ -1607,7 +1633,7 @@ class Logging:
langfuse_secret=self.langfuse_secret,
langfuse_host=self.langfuse_host,
)
langFuseLogger.log_event(
_response = langFuseLogger.log_event(
start_time=start_time,
end_time=end_time,
response_obj=None,
@ -1617,6 +1643,14 @@ class Logging:
level="ERROR",
kwargs=self.model_call_details,
)
if _response is not None and isinstance(_response, dict):
_trace_id = _response.get("trace_id", None)
if _trace_id is not None:
in_memory_trace_id_cache.set_cache(
litellm_call_id=self.litellm_call_id,
service_name="langfuse",
trace_id=_trace_id,
)
if callback == "traceloop":
traceloopLogger.log_event(
start_time=start_time,
@ -1721,6 +1755,24 @@ class Logging:
)
)
def _get_trace_id(self, service_name: Literal["langfuse"]) -> Optional[str]:
"""
For the given service (e.g. langfuse), return the trace_id actually logged.
Used for constructing the url in slack alerting.
Returns:
- str: The logged trace id
- None: If trace id not yet emitted.
"""
trace_id: Optional[str] = None
if service_name == "langfuse":
trace_id = in_memory_trace_id_cache.get_cache(
litellm_call_id=self.litellm_call_id, service_name=service_name
)
return trace_id
def set_callbacks(callback_list, function_id=None):
"""