fix(langfuse.py): support logging failed llm api calls to langfuse

This commit is contained in:
Krrish Dholakia 2024-02-05 16:16:15 -08:00
parent 77fe71ee08
commit a1bbb16ab2
2 changed files with 151 additions and 105 deletions

View file

@ -55,8 +55,21 @@ class LangFuseLogger:
else: else:
self.upstream_langfuse = None self.upstream_langfuse = None
# def log_error(kwargs, response_obj, start_time, end_time):
# generation = trace.generation(
# level ="ERROR" # can be any of DEBUG, DEFAULT, WARNING or ERROR
# status_message='error' # can be any string (e.g. stringified stack trace or error body)
# )
def log_event( def log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose self,
kwargs,
response_obj,
start_time,
end_time,
user_id,
print_verbose,
level="DEFAULT",
status_message=None,
): ):
# Method definition # Method definition
@ -84,37 +97,49 @@ class LangFuseLogger:
pass pass
# end of processing langfuse ######################## # end of processing langfuse ########################
if kwargs.get("call_type", None) == "embedding" or isinstance( if (
response_obj, litellm.EmbeddingResponse level == "ERROR"
and status_message is not None
and isinstance(status_message, str)
):
input = prompt
output = status_message
elif response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
): ):
input = prompt input = prompt
output = response_obj["data"] output = response_obj["data"]
else: elif response_obj is not None:
input = prompt input = prompt
output = response_obj["choices"][0]["message"].json() output = response_obj["choices"][0]["message"].json()
print_verbose(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}") print(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}")
self._log_langfuse_v2( if self._is_langfuse_v2():
user_id, self._log_langfuse_v2(
metadata, user_id,
output, metadata,
start_time, output,
end_time, start_time,
kwargs, end_time,
optional_params, kwargs,
input, optional_params,
response_obj, input,
print_verbose, response_obj,
) if self._is_langfuse_v2() else self._log_langfuse_v1( level,
user_id, print_verbose,
metadata, )
output, elif response_obj is not None:
start_time, self._log_langfuse_v1(
end_time, user_id,
kwargs, metadata,
optional_params, output,
input, start_time,
response_obj, end_time,
) kwargs,
optional_params,
input,
response_obj,
)
self.Langfuse.flush() self.Langfuse.flush()
print_verbose( print_verbose(
@ -123,15 +148,15 @@ class LangFuseLogger:
verbose_logger.info(f"Langfuse Layer Logging - logging success") verbose_logger.info(f"Langfuse Layer Logging - logging success")
except: except:
traceback.print_exc() traceback.print_exc()
print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}") print(f"Langfuse Layer Error - {traceback.format_exc()}")
pass pass
async def _async_log_event( async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
): ):
self.log_event( """
kwargs, response_obj, start_time, end_time, user_id, print_verbose TODO: support async calls when langfuse is truly async
) """
def _is_langfuse_v2(self): def _is_langfuse_v2(self):
import langfuse import langfuse
@ -193,57 +218,78 @@ class LangFuseLogger:
optional_params, optional_params,
input, input,
response_obj, response_obj,
level,
print_verbose, print_verbose,
): ):
import langfuse import langfuse
tags = [] try:
supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") tags = []
supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3") supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3")
supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3")
print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ") print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ")
generation_name = metadata.get("generation_name", None) generation_name = metadata.get("generation_name", None)
if generation_name is None: if generation_name is None:
# just log `litellm-{call_type}` as the generation name # just log `litellm-{call_type}` as the generation name
generation_name = f"litellm-{kwargs.get('call_type', 'completion')}" generation_name = f"litellm-{kwargs.get('call_type', 'completion')}"
trace_params = { trace_params = {
"name": generation_name, "name": generation_name,
"input": input, "input": input,
"output": output, "user_id": metadata.get("trace_user_id", user_id),
"user_id": metadata.get("trace_user_id", user_id), "id": metadata.get("trace_id", None),
"id": metadata.get("trace_id", None), "session_id": metadata.get("session_id", None),
"session_id": metadata.get("session_id", None), }
}
cost = kwargs["response_cost"]
print_verbose(f"trace: {cost}")
if supports_tags:
for key, value in metadata.items():
tags.append(f"{key}:{value}")
if "cache_hit" in kwargs:
tags.append(f"cache_hit:{kwargs['cache_hit']}")
trace_params.update({"tags": tags})
trace = self.Langfuse.trace(**trace_params) if level == "ERROR":
trace_params["status_message"] = output
else:
trace_params["output"] = output
# get generation_id cost = kwargs.get("response_cost", None)
generation_id = None print_verbose(f"trace: {cost}")
if response_obj.get("id", None) is not None: if supports_tags:
generation_id = litellm.utils.get_logging_id(start_time, response_obj) for key, value in metadata.items():
trace.generation( tags.append(f"{key}:{value}")
name=generation_name, if "cache_hit" in kwargs:
id=metadata.get("generation_id", generation_id), tags.append(f"cache_hit:{kwargs['cache_hit']}")
startTime=start_time, trace_params.update({"tags": tags})
endTime=end_time,
model=kwargs["model"], trace = self.Langfuse.trace(**trace_params)
modelParameters=optional_params,
input=input, if level == "ERROR":
output=output, trace.generation(
usage={ level="ERROR", # can be any of DEBUG, DEFAULT, WARNING or ERROR
"prompt_tokens": response_obj["usage"]["prompt_tokens"], status_message=output, # can be any string (e.g. stringified stack trace or error body)
"completion_tokens": response_obj["usage"]["completion_tokens"], )
"total_cost": cost if supports_costs else None, print(f"SUCCESSFULLY LOGGED ERROR")
}, else:
metadata=metadata, # get generation_id
) generation_id = None
if (
response_obj is not None
and response_obj.get("id", None) is not None
):
generation_id = litellm.utils.get_logging_id(
start_time, response_obj
)
trace.generation(
name=generation_name,
id=metadata.get("generation_id", generation_id),
startTime=start_time,
endTime=end_time,
model=kwargs["model"],
modelParameters=optional_params,
input=input,
output=output,
usage={
"prompt_tokens": response_obj["usage"]["prompt_tokens"],
"completion_tokens": response_obj["usage"]["completion_tokens"],
"total_cost": cost if supports_costs else None,
},
metadata=metadata,
)
except Exception as e:
print(f"Langfuse Layer Error - {traceback.format_exc()}")

View file

@ -1636,34 +1636,6 @@ class Logging:
end_time=end_time, end_time=end_time,
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "langfuse":
global langFuseLogger
print_verbose("reaches Async langfuse for logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if self.stream:
if "complete_streaming_response" not in kwargs:
return
else:
print_verbose(
"reaches Async langfuse for streaming logging!"
)
result = kwargs["complete_streaming_response"]
if langFuseLogger is None:
langFuseLogger = LangFuseLogger()
await langFuseLogger._async_log_event(
kwargs=kwargs,
response_obj=result,
start_time=start_time,
end_time=end_time,
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
)
except: except:
print_verbose( print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
@ -1788,9 +1760,37 @@ class Logging:
response_obj=result, response_obj=result,
kwargs=self.model_call_details, kwargs=self.model_call_details,
) )
elif callback == "langfuse":
global langFuseLogger
verbose_logger.debug("reaches langfuse for logging!")
kwargs = {}
for k, v in self.model_call_details.items():
if (
k != "original_response"
): # copy.deepcopy raises errors as this could be a coroutine
kwargs[k] = v
# this only logs streaming once, complete_streaming_response exists i.e when stream ends
if langFuseLogger is None or (
self.langfuse_public_key != langFuseLogger.public_key
and self.langfuse_secret != langFuseLogger.secret_key
):
langFuseLogger = LangFuseLogger(
langfuse_public_key=self.langfuse_public_key,
langfuse_secret=self.langfuse_secret,
)
langFuseLogger.log_event(
start_time=start_time,
end_time=end_time,
response_obj=None,
user_id=kwargs.get("user", None),
print_verbose=print_verbose,
status_message=str(exception),
level="ERROR",
kwargs=self.model_call_details,
)
except Exception as e: except Exception as e:
print_verbose( print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {traceback.format_exc()}" f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}"
) )
print_verbose( print_verbose(
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}"