diff --git a/litellm/integrations/langfuse.py b/litellm/integrations/langfuse.py index e62dccdc47..82de333660 100644 --- a/litellm/integrations/langfuse.py +++ b/litellm/integrations/langfuse.py @@ -55,8 +55,21 @@ class LangFuseLogger: else: self.upstream_langfuse = None + # def log_error(kwargs, response_obj, start_time, end_time): + # generation = trace.generation( + # level ="ERROR" # can be any of DEBUG, DEFAULT, WARNING or ERROR + # status_message='error' # can be any string (e.g. stringified stack trace or error body) + # ) def log_event( - self, kwargs, response_obj, start_time, end_time, user_id, print_verbose + self, + kwargs, + response_obj, + start_time, + end_time, + user_id, + print_verbose, + level="DEFAULT", + status_message=None, ): # Method definition @@ -84,37 +97,49 @@ class LangFuseLogger: pass # end of processing langfuse ######################## - if kwargs.get("call_type", None) == "embedding" or isinstance( - response_obj, litellm.EmbeddingResponse + if ( + level == "ERROR" + and status_message is not None + and isinstance(status_message, str) + ): + input = prompt + output = status_message + elif response_obj is not None and ( + kwargs.get("call_type", None) == "embedding" + or isinstance(response_obj, litellm.EmbeddingResponse) ): input = prompt output = response_obj["data"] - else: + elif response_obj is not None: input = prompt output = response_obj["choices"][0]["message"].json() - print_verbose(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}") - self._log_langfuse_v2( - user_id, - metadata, - output, - start_time, - end_time, - kwargs, - optional_params, - input, - response_obj, - print_verbose, - ) if self._is_langfuse_v2() else self._log_langfuse_v1( - user_id, - metadata, - output, - start_time, - end_time, - kwargs, - optional_params, - input, - response_obj, - ) + print(f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}") + if self._is_langfuse_v2(): + self._log_langfuse_v2( + user_id, + metadata, + output, + start_time, + end_time, + kwargs, + optional_params, + input, + response_obj, + level, + print_verbose, + ) + elif response_obj is not None: + self._log_langfuse_v1( + user_id, + metadata, + output, + start_time, + end_time, + kwargs, + optional_params, + input, + response_obj, + ) self.Langfuse.flush() print_verbose( @@ -123,15 +148,15 @@ class LangFuseLogger: verbose_logger.info(f"Langfuse Layer Logging - logging success") except: traceback.print_exc() - print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}") + print(f"Langfuse Layer Error - {traceback.format_exc()}") pass async def _async_log_event( self, kwargs, response_obj, start_time, end_time, user_id, print_verbose ): - self.log_event( - kwargs, response_obj, start_time, end_time, user_id, print_verbose - ) + """ + TODO: support async calls when langfuse is truly async + """ def _is_langfuse_v2(self): import langfuse @@ -193,57 +218,78 @@ class LangFuseLogger: optional_params, input, response_obj, + level, print_verbose, ): import langfuse - tags = [] - supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") - supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3") + try: + tags = [] + supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") + supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3") - print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ") + print_verbose(f"Langfuse Layer Logging - logging to langfuse v2 ") - generation_name = metadata.get("generation_name", None) - if generation_name is None: - # just log `litellm-{call_type}` as the generation name - generation_name = f"litellm-{kwargs.get('call_type', 'completion')}" + generation_name = metadata.get("generation_name", None) + if generation_name is None: + # just log `litellm-{call_type}` as the generation name + generation_name = f"litellm-{kwargs.get('call_type', 'completion')}" - trace_params = { - "name": generation_name, - "input": input, - "output": output, - "user_id": metadata.get("trace_user_id", user_id), - "id": metadata.get("trace_id", None), - "session_id": metadata.get("session_id", None), - } - cost = kwargs["response_cost"] - print_verbose(f"trace: {cost}") - if supports_tags: - for key, value in metadata.items(): - tags.append(f"{key}:{value}") - if "cache_hit" in kwargs: - tags.append(f"cache_hit:{kwargs['cache_hit']}") - trace_params.update({"tags": tags}) + trace_params = { + "name": generation_name, + "input": input, + "user_id": metadata.get("trace_user_id", user_id), + "id": metadata.get("trace_id", None), + "session_id": metadata.get("session_id", None), + } - trace = self.Langfuse.trace(**trace_params) + if level == "ERROR": + trace_params["status_message"] = output + else: + trace_params["output"] = output - # get generation_id - generation_id = None - if response_obj.get("id", None) is not None: - generation_id = litellm.utils.get_logging_id(start_time, response_obj) - trace.generation( - name=generation_name, - id=metadata.get("generation_id", generation_id), - startTime=start_time, - endTime=end_time, - model=kwargs["model"], - modelParameters=optional_params, - input=input, - output=output, - usage={ - "prompt_tokens": response_obj["usage"]["prompt_tokens"], - "completion_tokens": response_obj["usage"]["completion_tokens"], - "total_cost": cost if supports_costs else None, - }, - metadata=metadata, - ) + cost = kwargs.get("response_cost", None) + print_verbose(f"trace: {cost}") + if supports_tags: + for key, value in metadata.items(): + tags.append(f"{key}:{value}") + if "cache_hit" in kwargs: + tags.append(f"cache_hit:{kwargs['cache_hit']}") + trace_params.update({"tags": tags}) + + trace = self.Langfuse.trace(**trace_params) + + if level == "ERROR": + trace.generation( + level="ERROR", # can be any of DEBUG, DEFAULT, WARNING or ERROR + status_message=output, # can be any string (e.g. stringified stack trace or error body) + ) + print(f"SUCCESSFULLY LOGGED ERROR") + else: + # get generation_id + generation_id = None + if ( + response_obj is not None + and response_obj.get("id", None) is not None + ): + generation_id = litellm.utils.get_logging_id( + start_time, response_obj + ) + trace.generation( + name=generation_name, + id=metadata.get("generation_id", generation_id), + startTime=start_time, + endTime=end_time, + model=kwargs["model"], + modelParameters=optional_params, + input=input, + output=output, + usage={ + "prompt_tokens": response_obj["usage"]["prompt_tokens"], + "completion_tokens": response_obj["usage"]["completion_tokens"], + "total_cost": cost if supports_costs else None, + }, + metadata=metadata, + ) + except Exception as e: + print(f"Langfuse Layer Error - {traceback.format_exc()}") diff --git a/litellm/utils.py b/litellm/utils.py index e56ba879f8..1e83a319f4 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1636,34 +1636,6 @@ class Logging: end_time=end_time, print_verbose=print_verbose, ) - if callback == "langfuse": - global langFuseLogger - print_verbose("reaches Async langfuse for logging!") - kwargs = {} - for k, v in self.model_call_details.items(): - if ( - k != "original_response" - ): # copy.deepcopy raises errors as this could be a coroutine - kwargs[k] = v - # this only logs streaming once, complete_streaming_response exists i.e when stream ends - if self.stream: - if "complete_streaming_response" not in kwargs: - return - else: - print_verbose( - "reaches Async langfuse for streaming logging!" - ) - result = kwargs["complete_streaming_response"] - if langFuseLogger is None: - langFuseLogger = LangFuseLogger() - await langFuseLogger._async_log_event( - kwargs=kwargs, - response_obj=result, - start_time=start_time, - end_time=end_time, - user_id=kwargs.get("user", None), - print_verbose=print_verbose, - ) except: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" @@ -1788,9 +1760,37 @@ class Logging: response_obj=result, kwargs=self.model_call_details, ) + elif callback == "langfuse": + global langFuseLogger + verbose_logger.debug("reaches langfuse for logging!") + kwargs = {} + for k, v in self.model_call_details.items(): + if ( + k != "original_response" + ): # copy.deepcopy raises errors as this could be a coroutine + kwargs[k] = v + # this only logs streaming once, complete_streaming_response exists i.e when stream ends + if langFuseLogger is None or ( + self.langfuse_public_key != langFuseLogger.public_key + and self.langfuse_secret != langFuseLogger.secret_key + ): + langFuseLogger = LangFuseLogger( + langfuse_public_key=self.langfuse_public_key, + langfuse_secret=self.langfuse_secret, + ) + langFuseLogger.log_event( + start_time=start_time, + end_time=end_time, + response_obj=None, + user_id=kwargs.get("user", None), + print_verbose=print_verbose, + status_message=str(exception), + level="ERROR", + kwargs=self.model_call_details, + ) except Exception as e: print_verbose( - f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {traceback.format_exc()}" + f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}" ) print_verbose( f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}"