diff --git a/litellm/integrations/langfuse/langfuse.py b/litellm/integrations/langfuse/langfuse.py index 125bf4e686..f990a316c4 100644 --- a/litellm/integrations/langfuse/langfuse.py +++ b/litellm/integrations/langfuse/langfuse.py @@ -3,7 +3,8 @@ import copy import os import traceback -from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast +from datetime import datetime +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from packaging.version import Version @@ -13,9 +14,16 @@ from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info from litellm.llms.custom_httpx.http_handler import _get_httpx_client from litellm.secret_managers.main import str_to_bool from litellm.types.integrations.langfuse import * +from litellm.types.llms.openai import HttpxBinaryResponseContent from litellm.types.utils import ( + EmbeddingResponse, + ImageResponse, + ModelResponse, + RerankResponse, StandardLoggingPayload, StandardLoggingPromptManagementMetadata, + TextCompletionResponse, + TranscriptionResponse, ) if TYPE_CHECKING: @@ -150,19 +158,29 @@ class LangFuseLogger: return metadata - def _old_log_event( # noqa: PLR0915 + def log_event_on_langfuse( self, - kwargs, - response_obj, - start_time, - end_time, - user_id, - print_verbose, - level="DEFAULT", - status_message=None, + kwargs: dict, + response_obj: Union[ + None, + dict, + EmbeddingResponse, + ModelResponse, + TextCompletionResponse, + ImageResponse, + TranscriptionResponse, + RerankResponse, + HttpxBinaryResponseContent, + ], + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + user_id: Optional[str] = None, + level: str = "DEFAULT", + status_message: Optional[str] = None, ) -> dict: - # Method definition - + """ + Logs a success or error event on Langfuse + """ try: verbose_logger.debug( f"Langfuse Logging - Enters logging function for model {kwargs}" @@ -198,66 +216,13 @@ class LangFuseLogger: # if casting value to str fails don't block logging pass - # end of processing langfuse ######################## - if ( - level == "ERROR" - and status_message is not None - and isinstance(status_message, str) - ): - input = prompt - output = status_message - elif response_obj is not None and ( - kwargs.get("call_type", None) == "embedding" - or isinstance(response_obj, litellm.EmbeddingResponse) - ): - input = prompt - output = None - elif response_obj is not None and isinstance( - response_obj, litellm.ModelResponse - ): - input = prompt - output = response_obj["choices"][0]["message"].json() - elif response_obj is not None and isinstance( - response_obj, litellm.HttpxBinaryResponseContent - ): - input = prompt - output = "speech-output" - elif response_obj is not None and isinstance( - response_obj, litellm.TextCompletionResponse - ): - input = prompt - output = response_obj.choices[0].text - elif response_obj is not None and isinstance( - response_obj, litellm.ImageResponse - ): - input = prompt - output = response_obj["data"] - elif response_obj is not None and isinstance( - response_obj, litellm.TranscriptionResponse - ): - input = prompt - output = response_obj["text"] - elif response_obj is not None and isinstance( - response_obj, litellm.RerankResponse - ): - input = prompt - output = response_obj.results - elif ( - kwargs.get("call_type") is not None - and kwargs.get("call_type") == "_arealtime" - and response_obj is not None - and isinstance(response_obj, list) - ): - input = kwargs.get("input") - output = response_obj - elif ( - kwargs.get("call_type") is not None - and kwargs.get("call_type") == "pass_through_endpoint" - and response_obj is not None - and isinstance(response_obj, dict) - ): - input = prompt - output = response_obj.get("response", "") + input, output = self._get_langfuse_input_output_content( + kwargs=kwargs, + response_obj=response_obj, + prompt=prompt, + level=level, + status_message=status_message, + ) verbose_logger.debug( f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}" ) @@ -265,31 +230,30 @@ class LangFuseLogger: generation_id = None if self._is_langfuse_v2(): trace_id, generation_id = self._log_langfuse_v2( - user_id, - metadata, - litellm_params, - output, - start_time, - end_time, - kwargs, - optional_params, - input, - response_obj, - level, - print_verbose, - litellm_call_id, + user_id=user_id, + metadata=metadata, + litellm_params=litellm_params, + output=output, + start_time=start_time, + end_time=end_time, + kwargs=kwargs, + optional_params=optional_params, + input=input, + response_obj=response_obj, + level=level, + litellm_call_id=litellm_call_id, ) elif response_obj is not None: self._log_langfuse_v1( - user_id, - metadata, - output, - start_time, - end_time, - kwargs, - optional_params, - input, - response_obj, + user_id=user_id, + metadata=metadata, + output=output, + start_time=start_time, + end_time=end_time, + kwargs=kwargs, + optional_params=optional_params, + input=input, + response_obj=response_obj, ) verbose_logger.debug( f"Langfuse Layer Logging - final response object: {response_obj}" @@ -303,11 +267,108 @@ class LangFuseLogger: ) return {"trace_id": None, "generation_id": None} + def _get_langfuse_input_output_content( + self, + kwargs: dict, + response_obj: Union[ + None, + dict, + EmbeddingResponse, + ModelResponse, + TextCompletionResponse, + ImageResponse, + TranscriptionResponse, + RerankResponse, + HttpxBinaryResponseContent, + ], + prompt: dict, + level: str, + status_message: Optional[str], + ) -> Tuple[Optional[dict], Optional[Union[str, dict, list]]]: + """ + Get the input and output content for Langfuse logging + + Args: + kwargs: The keyword arguments passed to the function + response_obj: The response object returned by the function + prompt: The prompt used to generate the response + level: The level of the log message + status_message: The status message of the log message + + Returns: + input: The input content for Langfuse logging + output: The output content for Langfuse logging + """ + input = None + output: Optional[Union[str, dict, List[Any]]] = None + if ( + level == "ERROR" + and status_message is not None + and isinstance(status_message, str) + ): + input = prompt + output = status_message + elif response_obj is not None and ( + kwargs.get("call_type", None) == "embedding" + or isinstance(response_obj, litellm.EmbeddingResponse) + ): + input = prompt + output = None + elif response_obj is not None and isinstance( + response_obj, litellm.ModelResponse + ): + input = prompt + output = self._get_chat_content_for_langfuse(response_obj) + elif response_obj is not None and isinstance( + response_obj, litellm.HttpxBinaryResponseContent + ): + input = prompt + output = "speech-output" + elif response_obj is not None and isinstance( + response_obj, litellm.TextCompletionResponse + ): + input = prompt + output = self._get_text_completion_content_for_langfuse(response_obj) + elif response_obj is not None and isinstance( + response_obj, litellm.ImageResponse + ): + input = prompt + output = response_obj.get("data", None) + elif response_obj is not None and isinstance( + response_obj, litellm.TranscriptionResponse + ): + input = prompt + output = response_obj.get("text", None) + elif response_obj is not None and isinstance( + response_obj, litellm.RerankResponse + ): + input = prompt + output = response_obj.results + elif ( + kwargs.get("call_type") is not None + and kwargs.get("call_type") == "_arealtime" + and response_obj is not None + and isinstance(response_obj, list) + ): + input = kwargs.get("input") + output = response_obj + elif ( + kwargs.get("call_type") is not None + and kwargs.get("call_type") == "pass_through_endpoint" + and response_obj is not None + and isinstance(response_obj, dict) + ): + input = prompt + output = response_obj.get("response", "") + return input, output + async def _async_log_event( - self, kwargs, response_obj, start_time, end_time, user_id, print_verbose + self, kwargs, response_obj, start_time, end_time, user_id ): """ - TODO: support async calls when langfuse is truly async + Langfuse SDK uses a background thread to log events + + This approach does not impact latency and runs in the background """ def _is_langfuse_v2(self): @@ -361,19 +422,18 @@ class LangFuseLogger: def _log_langfuse_v2( # noqa: PLR0915 self, - user_id, - metadata, - litellm_params, - output, - start_time, - end_time, - kwargs, - optional_params, - input, + user_id: Optional[str], + metadata: dict, + litellm_params: dict, + output: Optional[Union[str, dict, list]], + start_time: Optional[datetime], + end_time: Optional[datetime], + kwargs: dict, + optional_params: dict, + input: Optional[dict], response_obj, - level, - print_verbose, - litellm_call_id, + level: str, + litellm_call_id: Optional[str], ) -> tuple: verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2") @@ -657,6 +717,31 @@ class LangFuseLogger: verbose_logger.error(f"Langfuse Layer Error - {traceback.format_exc()}") return None, None + @staticmethod + def _get_chat_content_for_langfuse( + response_obj: ModelResponse, + ): + """ + Get the chat content for Langfuse logging + """ + if response_obj.choices and len(response_obj.choices) > 0: + output = response_obj["choices"][0]["message"].json() + return output + else: + return None + + @staticmethod + def _get_text_completion_content_for_langfuse( + response_obj: TextCompletionResponse, + ): + """ + Get the text completion content for Langfuse logging + """ + if response_obj.choices and len(response_obj.choices) > 0: + return response_obj.choices[0].text + else: + return None + @staticmethod def _get_langfuse_tags( standard_logging_object: Optional[StandardLoggingPayload], diff --git a/litellm/integrations/langfuse/langfuse_prompt_management.py b/litellm/integrations/langfuse/langfuse_prompt_management.py index faa4a63491..cc2a6cf80d 100644 --- a/litellm/integrations/langfuse/langfuse_prompt_management.py +++ b/litellm/integrations/langfuse/langfuse_prompt_management.py @@ -247,13 +247,12 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge standard_callback_dynamic_params=standard_callback_dynamic_params, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, ) - langfuse_logger_to_use._old_log_event( + langfuse_logger_to_use.log_event_on_langfuse( kwargs=kwargs, response_obj=response_obj, start_time=start_time, end_time=end_time, user_id=kwargs.get("user", None), - print_verbose=None, ) async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): @@ -271,12 +270,11 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge ) if standard_logging_object is None: return - langfuse_logger_to_use._old_log_event( + langfuse_logger_to_use.log_event_on_langfuse( start_time=start_time, end_time=end_time, response_obj=None, user_id=kwargs.get("user", None), - print_verbose=None, status_message=standard_logging_object["error_str"], level="ERROR", kwargs=kwargs, diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 45b63177b9..28182b75ac 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -1247,13 +1247,12 @@ class Logging(LiteLLMLoggingBaseClass): in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, ) if langfuse_logger_to_use is not None: - _response = langfuse_logger_to_use._old_log_event( + _response = langfuse_logger_to_use.log_event_on_langfuse( kwargs=kwargs, response_obj=result, start_time=start_time, end_time=end_time, user_id=kwargs.get("user", None), - print_verbose=print_verbose, ) if _response is not None and isinstance(_response, dict): _trace_id = _response.get("trace_id", None) @@ -1957,12 +1956,11 @@ class Logging(LiteLLMLoggingBaseClass): standard_callback_dynamic_params=self.standard_callback_dynamic_params, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, ) - _response = langfuse_logger_to_use._old_log_event( + _response = langfuse_logger_to_use.log_event_on_langfuse( start_time=start_time, end_time=end_time, response_obj=None, user_id=kwargs.get("user", None), - print_verbose=print_verbose, status_message=str(exception), level="ERROR", kwargs=self.model_call_details, diff --git a/litellm/router.py b/litellm/router.py index faa7cd4a6b..b61c30dd57 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -3699,8 +3699,9 @@ class Router: Else, original response is returned. """ - if response.choices[0].finish_reason != "content_filter": - return False + if response.choices and len(response.choices) > 0: + if response.choices[0].finish_reason != "content_filter": + return False content_policy_fallbacks = kwargs.get( "content_policy_fallbacks", self.content_policy_fallbacks diff --git a/tests/logging_callback_tests/langfuse_expected_request_body/completion_with_no_choices.json b/tests/logging_callback_tests/langfuse_expected_request_body/completion_with_no_choices.json new file mode 100644 index 0000000000..0683ff9ba9 --- /dev/null +++ b/tests/logging_callback_tests/langfuse_expected_request_body/completion_with_no_choices.json @@ -0,0 +1,75 @@ +{ + "batch": [ + { + "id": "1f1d7517-4602-4c59-a322-7fc0306f1b7a", + "type": "trace-create", + "body": { + "id": "litellm-test-dbadfdfc-f4e7-4f05-8992-984c37359166", + "timestamp": "2025-02-07T00:23:27.669634Z", + "name": "litellm-acompletion", + "input": { + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] + }, + "tags": [] + }, + "timestamp": "2025-02-07T00:23:27.669809Z" + }, + { + "id": "fbe610b6-f500-4c7d-8e34-d40a0e8c487b", + "type": "generation-create", + "body": { + "traceId": "litellm-test-dbadfdfc-f4e7-4f05-8992-984c37359166", + "name": "litellm-acompletion", + "startTime": "2025-02-06T16:23:27.220129-08:00", + "metadata": { + "hidden_params": { + "model_id": null, + "cache_key": null, + "api_base": "https://api.openai.com", + "response_cost": 3.5e-05, + "additional_headers": {}, + "litellm_overhead_time_ms": null + }, + "litellm_response_cost": 3.5e-05, + "cache_hit": false, + "requester_metadata": {} + }, + "input": { + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] + }, + "level": "DEFAULT", + "id": "time-16-23-27-220129_chatcmpl-565360d7-965f-4533-9c09-db789af77a7d", + "endTime": "2025-02-06T16:23:27.644253-08:00", + "completionStartTime": "2025-02-06T16:23:27.644253-08:00", + "model": "gpt-3.5-turbo", + "modelParameters": { + "extra_body": "{}" + }, + "usage": { + "input": 10, + "output": 10, + "unit": "TOKENS", + "totalCost": 3.5e-05 + } + }, + "timestamp": "2025-02-07T00:23:27.670175Z" + } + ], + "metadata": { + "batch_size": 2, + "sdk_integration": "litellm", + "sdk_name": "python", + "sdk_version": "2.44.1", + "public_key": "pk-lf-e02aaea3-8668-4c9f-8c69-771a4ea1f5c9" + } +} \ No newline at end of file diff --git a/tests/logging_callback_tests/test_langfuse_e2e_test.py b/tests/logging_callback_tests/test_langfuse_e2e_test.py index 79197d6c25..b46d8764dd 100644 --- a/tests/logging_callback_tests/test_langfuse_e2e_test.py +++ b/tests/logging_callback_tests/test_langfuse_e2e_test.py @@ -352,3 +352,32 @@ class TestLangfuseLogging: response_json_file, setup["trace_id"], ) + + @pytest.mark.asyncio + async def test_langfuse_logging_completion_with_malformed_llm_response( + self, mock_setup + ): + """Test Langfuse logging for chat completion with malformed LLM response""" + setup = await mock_setup # Await the fixture + litellm._turn_on_debug() + with patch("httpx.Client.post", setup["mock_post"]): + mock_response = litellm.ModelResponse( + choices=[], + usage=litellm.Usage( + prompt_tokens=10, + completion_tokens=10, + total_tokens=20, + ), + model="gpt-3.5-turbo", + object="chat.completion", + created=1723081200, + ).model_dump() + await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello!"}], + mock_response=mock_response, + metadata={"trace_id": setup["trace_id"]}, + ) + await self._verify_langfuse_call( + setup["mock_post"], "completion_with_no_choices.json", setup["trace_id"] + ) diff --git a/tests/logging_callback_tests/test_langfuse_unit_tests.py b/tests/logging_callback_tests/test_langfuse_unit_tests.py index e9c255c1a3..16ed464fff 100644 --- a/tests/logging_callback_tests/test_langfuse_unit_tests.py +++ b/tests/logging_callback_tests/test_langfuse_unit_tests.py @@ -21,6 +21,11 @@ from litellm.types.utils import ( StandardLoggingMetadata, StandardLoggingHiddenParams, StandardCallbackDynamicParams, + ModelResponse, + Choices, + Message, + TextCompletionResponse, + TextChoices, ) @@ -294,7 +299,6 @@ def test_get_langfuse_tags(): assert result == [] - @patch.dict(os.environ, {}, clear=True) # Start with empty environment def test_get_langfuse_flush_interval(): """ @@ -316,6 +320,7 @@ def test_get_langfuse_flush_interval(): ) assert result == 120 + def test_langfuse_e2e_sync(monkeypatch): from litellm import completion import litellm @@ -343,3 +348,45 @@ def test_langfuse_e2e_sync(monkeypatch): assert langfuse_mock.called + +def test_get_chat_content_for_langfuse(): + """ + Test that _get_chat_content_for_langfuse correctly extracts content from chat completion responses + """ + # Test with valid response + mock_response = ModelResponse( + choices=[Choices(message=Message(role="assistant", content="Hello world"))] + ) + + result = LangFuseLogger._get_chat_content_for_langfuse(mock_response) + assert result == { + "content": "Hello world", + "role": "assistant", + "tool_calls": None, + "function_call": None, + } + + # Test with empty choices + mock_response = ModelResponse(choices=[]) + result = LangFuseLogger._get_chat_content_for_langfuse(mock_response) + assert result is None + + +def test_get_text_completion_content_for_langfuse(): + """ + Test that _get_text_completion_content_for_langfuse correctly extracts content from text completion responses + """ + # Test with valid response + mock_response = TextCompletionResponse(choices=[TextChoices(text="Hello world")]) + result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response) + assert result == "Hello world" + + # Test with empty choices + mock_response = TextCompletionResponse(choices=[]) + result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response) + assert result is None + + # Test with no choices field + mock_response = TextCompletionResponse() + result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response) + assert result is None