mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 19:54:13 +00:00
fix(langfuse.py): support logging failed llm api calls to langfuse
This commit is contained in:
parent
f48c92e817
commit
f363f0f5ba
2 changed files with 151 additions and 105 deletions
|
@ -55,8 +55,21 @@ class LangFuseLogger:
|
||||||
else:
|
else:
|
||||||
self.upstream_langfuse = None
|
self.upstream_langfuse = None
|
||||||
|
|
||||||
|
# def log_error(kwargs, response_obj, start_time, end_time):
|
||||||
|
# generation = trace.generation(
|
||||||
|
# level ="ERROR" # can be any of DEBUG, DEFAULT, WARNING or ERROR
|
||||||
|
# status_message='error' # can be any string (e.g. stringified stack trace or error body)
|
||||||
|
# )
|
||||||
def log_event(
|
def log_event(
|
||||||
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
|
self,
|
||||||
|
kwargs,
|
||||||
|
response_obj,
|
||||||
|
start_time,
|
||||||
|
end_time,
|
||||||
|
user_id,
|
||||||
|
print_verbose,
|
||||||
|
level="DEFAULT",
|
||||||
|
status_message=None,
|
||||||
):
|
):
|
||||||
# Method definition
|
# Method definition
|
||||||
|
|
||||||
|
@ -84,15 +97,24 @@ class LangFuseLogger:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# end of processing langfuse ########################
|
# end of processing langfuse ########################
|
||||||
if kwargs.get("call_type", None) == "embedding" or isinstance(
|
if (
|
||||||
response_obj, litellm.EmbeddingResponse
|
level == "ERROR"
|
||||||
|
and status_message is not None
|
||||||
|
and isinstance(status_message, str)
|
||||||
|
):
|
||||||
|
input = prompt
|
||||||
|
output = status_message
|
||||||
|
elif response_obj is not None and (
|
||||||
|
kwargs.get("call_type", None) == "embedding"
|
||||||
|
or isinstance(response_obj, litellm.EmbeddingResponse)
|
||||||
):
|
):
|
||||||
input = prompt
|
input = prompt
|
||||||
output = response_obj["data"]
|
output = response_obj["data"]
|
||||||
else:
|
elif response_obj is not None:
|
||||||
input = prompt
|
input = prompt
|
||||||
output = response_obj["choices"][0]["message"].json()
|
output = response_obj["choices"][0]["message"].json()
|
||||||
print_verbose(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}")
|
print(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}")
|
||||||
|
if self._is_langfuse_v2():
|
||||||
self._log_langfuse_v2(
|
self._log_langfuse_v2(
|
||||||
user_id,
|
user_id,
|
||||||
metadata,
|
metadata,
|
||||||
|
@ -103,8 +125,11 @@ class LangFuseLogger:
|
||||||
optional_params,
|
optional_params,
|
||||||
input,
|
input,
|
||||||
response_obj,
|
response_obj,
|
||||||
|
level,
|
||||||
print_verbose,
|
print_verbose,
|
||||||
) if self._is_langfuse_v2() else self._log_langfuse_v1(
|
)
|
||||||
|
elif response_obj is not None:
|
||||||
|
self._log_langfuse_v1(
|
||||||
user_id,
|
user_id,
|
||||||
metadata,
|
metadata,
|
||||||
output,
|
output,
|
||||||
|
@ -123,15 +148,15 @@ class LangFuseLogger:
|
||||||
verbose_logger.info(f"Langfuse Layer Logging - logging success")
|
verbose_logger.info(f"Langfuse Layer Logging - logging success")
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}")
|
print(f"Langfuse Layer Error - {traceback.format_exc()}")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def _async_log_event(
|
async def _async_log_event(
|
||||||
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
|
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
|
||||||
):
|
):
|
||||||
self.log_event(
|
"""
|
||||||
kwargs, response_obj, start_time, end_time, user_id, print_verbose
|
TODO: support async calls when langfuse is truly async
|
||||||
)
|
"""
|
||||||
|
|
||||||
def _is_langfuse_v2(self):
|
def _is_langfuse_v2(self):
|
||||||
import langfuse
|
import langfuse
|
||||||
|
@ -193,10 +218,12 @@ class LangFuseLogger:
|
||||||
optional_params,
|
optional_params,
|
||||||
input,
|
input,
|
||||||
response_obj,
|
response_obj,
|
||||||
|
level,
|
||||||
print_verbose,
|
print_verbose,
|
||||||
):
|
):
|
||||||
import langfuse
|
import langfuse
|
||||||
|
|
||||||
|
try:
|
||||||
tags = []
|
tags = []
|
||||||
supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3")
|
supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3")
|
||||||
supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3")
|
supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3")
|
||||||
|
@ -211,12 +238,17 @@ class LangFuseLogger:
|
||||||
trace_params = {
|
trace_params = {
|
||||||
"name": generation_name,
|
"name": generation_name,
|
||||||
"input": input,
|
"input": input,
|
||||||
"output": output,
|
|
||||||
"user_id": metadata.get("trace_user_id", user_id),
|
"user_id": metadata.get("trace_user_id", user_id),
|
||||||
"id": metadata.get("trace_id", None),
|
"id": metadata.get("trace_id", None),
|
||||||
"session_id": metadata.get("session_id", None),
|
"session_id": metadata.get("session_id", None),
|
||||||
}
|
}
|
||||||
cost = kwargs["response_cost"]
|
|
||||||
|
if level == "ERROR":
|
||||||
|
trace_params["status_message"] = output
|
||||||
|
else:
|
||||||
|
trace_params["output"] = output
|
||||||
|
|
||||||
|
cost = kwargs.get("response_cost", None)
|
||||||
print_verbose(f"trace: {cost}")
|
print_verbose(f"trace: {cost}")
|
||||||
if supports_tags:
|
if supports_tags:
|
||||||
for key, value in metadata.items():
|
for key, value in metadata.items():
|
||||||
|
@ -227,10 +259,22 @@ class LangFuseLogger:
|
||||||
|
|
||||||
trace = self.Langfuse.trace(**trace_params)
|
trace = self.Langfuse.trace(**trace_params)
|
||||||
|
|
||||||
|
if level == "ERROR":
|
||||||
|
trace.generation(
|
||||||
|
level="ERROR", # can be any of DEBUG, DEFAULT, WARNING or ERROR
|
||||||
|
status_message=output, # can be any string (e.g. stringified stack trace or error body)
|
||||||
|
)
|
||||||
|
print(f"SUCCESSFULLY LOGGED ERROR")
|
||||||
|
else:
|
||||||
# get generation_id
|
# get generation_id
|
||||||
generation_id = None
|
generation_id = None
|
||||||
if response_obj.get("id", None) is not None:
|
if (
|
||||||
generation_id = litellm.utils.get_logging_id(start_time, response_obj)
|
response_obj is not None
|
||||||
|
and response_obj.get("id", None) is not None
|
||||||
|
):
|
||||||
|
generation_id = litellm.utils.get_logging_id(
|
||||||
|
start_time, response_obj
|
||||||
|
)
|
||||||
trace.generation(
|
trace.generation(
|
||||||
name=generation_name,
|
name=generation_name,
|
||||||
id=metadata.get("generation_id", generation_id),
|
id=metadata.get("generation_id", generation_id),
|
||||||
|
@ -247,3 +291,5 @@ class LangFuseLogger:
|
||||||
},
|
},
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
)
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Langfuse Layer Error - {traceback.format_exc()}")
|
||||||
|
|
|
@ -1636,34 +1636,6 @@ class Logging:
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
print_verbose=print_verbose,
|
print_verbose=print_verbose,
|
||||||
)
|
)
|
||||||
if callback == "langfuse":
|
|
||||||
global langFuseLogger
|
|
||||||
print_verbose("reaches Async langfuse for logging!")
|
|
||||||
kwargs = {}
|
|
||||||
for k, v in self.model_call_details.items():
|
|
||||||
if (
|
|
||||||
k != "original_response"
|
|
||||||
): # copy.deepcopy raises errors as this could be a coroutine
|
|
||||||
kwargs[k] = v
|
|
||||||
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
|
|
||||||
if self.stream:
|
|
||||||
if "complete_streaming_response" not in kwargs:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
print_verbose(
|
|
||||||
"reaches Async langfuse for streaming logging!"
|
|
||||||
)
|
|
||||||
result = kwargs["complete_streaming_response"]
|
|
||||||
if langFuseLogger is None:
|
|
||||||
langFuseLogger = LangFuseLogger()
|
|
||||||
await langFuseLogger._async_log_event(
|
|
||||||
kwargs=kwargs,
|
|
||||||
response_obj=result,
|
|
||||||
start_time=start_time,
|
|
||||||
end_time=end_time,
|
|
||||||
user_id=kwargs.get("user", None),
|
|
||||||
print_verbose=print_verbose,
|
|
||||||
)
|
|
||||||
except:
|
except:
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
|
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
|
||||||
|
@ -1788,9 +1760,37 @@ class Logging:
|
||||||
response_obj=result,
|
response_obj=result,
|
||||||
kwargs=self.model_call_details,
|
kwargs=self.model_call_details,
|
||||||
)
|
)
|
||||||
|
elif callback == "langfuse":
|
||||||
|
global langFuseLogger
|
||||||
|
verbose_logger.debug("reaches langfuse for logging!")
|
||||||
|
kwargs = {}
|
||||||
|
for k, v in self.model_call_details.items():
|
||||||
|
if (
|
||||||
|
k != "original_response"
|
||||||
|
): # copy.deepcopy raises errors as this could be a coroutine
|
||||||
|
kwargs[k] = v
|
||||||
|
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
|
||||||
|
if langFuseLogger is None or (
|
||||||
|
self.langfuse_public_key != langFuseLogger.public_key
|
||||||
|
and self.langfuse_secret != langFuseLogger.secret_key
|
||||||
|
):
|
||||||
|
langFuseLogger = LangFuseLogger(
|
||||||
|
langfuse_public_key=self.langfuse_public_key,
|
||||||
|
langfuse_secret=self.langfuse_secret,
|
||||||
|
)
|
||||||
|
langFuseLogger.log_event(
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
response_obj=None,
|
||||||
|
user_id=kwargs.get("user", None),
|
||||||
|
print_verbose=print_verbose,
|
||||||
|
status_message=str(exception),
|
||||||
|
level="ERROR",
|
||||||
|
kwargs=self.model_call_details,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {traceback.format_exc()}"
|
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}"
|
||||||
)
|
)
|
||||||
print_verbose(
|
print_verbose(
|
||||||
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}"
|
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue