(Bug Fix - Langfuse) - fix for when model response has choices=[] (#8339)

* refactor _get_langfuse_input_output_content

* test_langfuse_logging_completion_with_malformed_llm_response

* fix _get_langfuse_input_output_content

* fixes for langfuse linting

* unit testing for get chat/text content for langfuse

* fix _should_raise_content_policy_error
This commit is contained in:
Ishaan Jaff 2025-02-06 18:02:26 -08:00 committed by GitHub
parent 92ba180adc
commit 39d616f22d
7 changed files with 352 additions and 119 deletions

View file

@ -3,7 +3,8 @@
import copy import copy
import os import os
import traceback import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from packaging.version import Version from packaging.version import Version
@ -13,9 +14,16 @@ from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info
from litellm.llms.custom_httpx.http_handler import _get_httpx_client from litellm.llms.custom_httpx.http_handler import _get_httpx_client
from litellm.secret_managers.main import str_to_bool from litellm.secret_managers.main import str_to_bool
from litellm.types.integrations.langfuse import * from litellm.types.integrations.langfuse import *
from litellm.types.llms.openai import HttpxBinaryResponseContent
from litellm.types.utils import ( from litellm.types.utils import (
EmbeddingResponse,
ImageResponse,
ModelResponse,
RerankResponse,
StandardLoggingPayload, StandardLoggingPayload,
StandardLoggingPromptManagementMetadata, StandardLoggingPromptManagementMetadata,
TextCompletionResponse,
TranscriptionResponse,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -150,19 +158,29 @@ class LangFuseLogger:
return metadata return metadata
def _old_log_event( # noqa: PLR0915 def log_event_on_langfuse(
self, self,
kwargs, kwargs: dict,
response_obj, response_obj: Union[
start_time, None,
end_time, dict,
user_id, EmbeddingResponse,
print_verbose, ModelResponse,
level="DEFAULT", TextCompletionResponse,
status_message=None, ImageResponse,
TranscriptionResponse,
RerankResponse,
HttpxBinaryResponseContent,
],
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
user_id: Optional[str] = None,
level: str = "DEFAULT",
status_message: Optional[str] = None,
) -> dict: ) -> dict:
# Method definition """
Logs a success or error event on Langfuse
"""
try: try:
verbose_logger.debug( verbose_logger.debug(
f"Langfuse Logging - Enters logging function for model {kwargs}" f"Langfuse Logging - Enters logging function for model {kwargs}"
@ -198,66 +216,13 @@ class LangFuseLogger:
# if casting value to str fails don't block logging # if casting value to str fails don't block logging
pass pass
# end of processing langfuse ######################## input, output = self._get_langfuse_input_output_content(
if ( kwargs=kwargs,
level == "ERROR" response_obj=response_obj,
and status_message is not None prompt=prompt,
and isinstance(status_message, str) level=level,
): status_message=status_message,
input = prompt )
output = status_message
elif response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
input = prompt
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
input = prompt
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.HttpxBinaryResponseContent
):
input = prompt
output = "speech-output"
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
input = prompt
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
input = prompt
output = response_obj["data"]
elif response_obj is not None and isinstance(
response_obj, litellm.TranscriptionResponse
):
input = prompt
output = response_obj["text"]
elif response_obj is not None and isinstance(
response_obj, litellm.RerankResponse
):
input = prompt
output = response_obj.results
elif (
kwargs.get("call_type") is not None
and kwargs.get("call_type") == "_arealtime"
and response_obj is not None
and isinstance(response_obj, list)
):
input = kwargs.get("input")
output = response_obj
elif (
kwargs.get("call_type") is not None
and kwargs.get("call_type") == "pass_through_endpoint"
and response_obj is not None
and isinstance(response_obj, dict)
):
input = prompt
output = response_obj.get("response", "")
verbose_logger.debug( verbose_logger.debug(
f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}" f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}"
) )
@ -265,31 +230,30 @@ class LangFuseLogger:
generation_id = None generation_id = None
if self._is_langfuse_v2(): if self._is_langfuse_v2():
trace_id, generation_id = self._log_langfuse_v2( trace_id, generation_id = self._log_langfuse_v2(
user_id, user_id=user_id,
metadata, metadata=metadata,
litellm_params, litellm_params=litellm_params,
output, output=output,
start_time, start_time=start_time,
end_time, end_time=end_time,
kwargs, kwargs=kwargs,
optional_params, optional_params=optional_params,
input, input=input,
response_obj, response_obj=response_obj,
level, level=level,
print_verbose, litellm_call_id=litellm_call_id,
litellm_call_id,
) )
elif response_obj is not None: elif response_obj is not None:
self._log_langfuse_v1( self._log_langfuse_v1(
user_id, user_id=user_id,
metadata, metadata=metadata,
output, output=output,
start_time, start_time=start_time,
end_time, end_time=end_time,
kwargs, kwargs=kwargs,
optional_params, optional_params=optional_params,
input, input=input,
response_obj, response_obj=response_obj,
) )
verbose_logger.debug( verbose_logger.debug(
f"Langfuse Layer Logging - final response object: {response_obj}" f"Langfuse Layer Logging - final response object: {response_obj}"
@ -303,11 +267,108 @@ class LangFuseLogger:
) )
return {"trace_id": None, "generation_id": None} return {"trace_id": None, "generation_id": None}
def _get_langfuse_input_output_content(
self,
kwargs: dict,
response_obj: Union[
None,
dict,
EmbeddingResponse,
ModelResponse,
TextCompletionResponse,
ImageResponse,
TranscriptionResponse,
RerankResponse,
HttpxBinaryResponseContent,
],
prompt: dict,
level: str,
status_message: Optional[str],
) -> Tuple[Optional[dict], Optional[Union[str, dict, list]]]:
"""
Get the input and output content for Langfuse logging
Args:
kwargs: The keyword arguments passed to the function
response_obj: The response object returned by the function
prompt: The prompt used to generate the response
level: The level of the log message
status_message: The status message of the log message
Returns:
input: The input content for Langfuse logging
output: The output content for Langfuse logging
"""
input = None
output: Optional[Union[str, dict, List[Any]]] = None
if (
level == "ERROR"
and status_message is not None
and isinstance(status_message, str)
):
input = prompt
output = status_message
elif response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
input = prompt
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
input = prompt
output = self._get_chat_content_for_langfuse(response_obj)
elif response_obj is not None and isinstance(
response_obj, litellm.HttpxBinaryResponseContent
):
input = prompt
output = "speech-output"
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
input = prompt
output = self._get_text_completion_content_for_langfuse(response_obj)
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
input = prompt
output = response_obj.get("data", None)
elif response_obj is not None and isinstance(
response_obj, litellm.TranscriptionResponse
):
input = prompt
output = response_obj.get("text", None)
elif response_obj is not None and isinstance(
response_obj, litellm.RerankResponse
):
input = prompt
output = response_obj.results
elif (
kwargs.get("call_type") is not None
and kwargs.get("call_type") == "_arealtime"
and response_obj is not None
and isinstance(response_obj, list)
):
input = kwargs.get("input")
output = response_obj
elif (
kwargs.get("call_type") is not None
and kwargs.get("call_type") == "pass_through_endpoint"
and response_obj is not None
and isinstance(response_obj, dict)
):
input = prompt
output = response_obj.get("response", "")
return input, output
async def _async_log_event( async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose self, kwargs, response_obj, start_time, end_time, user_id
): ):
""" """
TODO: support async calls when langfuse is truly async Langfuse SDK uses a background thread to log events
This approach does not impact latency and runs in the background
""" """
def _is_langfuse_v2(self): def _is_langfuse_v2(self):
@ -361,19 +422,18 @@ class LangFuseLogger:
def _log_langfuse_v2( # noqa: PLR0915 def _log_langfuse_v2( # noqa: PLR0915
self, self,
user_id, user_id: Optional[str],
metadata, metadata: dict,
litellm_params, litellm_params: dict,
output, output: Optional[Union[str, dict, list]],
start_time, start_time: Optional[datetime],
end_time, end_time: Optional[datetime],
kwargs, kwargs: dict,
optional_params, optional_params: dict,
input, input: Optional[dict],
response_obj, response_obj,
level, level: str,
print_verbose, litellm_call_id: Optional[str],
litellm_call_id,
) -> tuple: ) -> tuple:
verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2") verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2")
@ -657,6 +717,31 @@ class LangFuseLogger:
verbose_logger.error(f"Langfuse Layer Error - {traceback.format_exc()}") verbose_logger.error(f"Langfuse Layer Error - {traceback.format_exc()}")
return None, None return None, None
@staticmethod
def _get_chat_content_for_langfuse(
response_obj: ModelResponse,
):
"""
Get the chat content for Langfuse logging
"""
if response_obj.choices and len(response_obj.choices) > 0:
output = response_obj["choices"][0]["message"].json()
return output
else:
return None
@staticmethod
def _get_text_completion_content_for_langfuse(
response_obj: TextCompletionResponse,
):
"""
Get the text completion content for Langfuse logging
"""
if response_obj.choices and len(response_obj.choices) > 0:
return response_obj.choices[0].text
else:
return None
@staticmethod @staticmethod
def _get_langfuse_tags( def _get_langfuse_tags(
standard_logging_object: Optional[StandardLoggingPayload], standard_logging_object: Optional[StandardLoggingPayload],

View file

@ -247,13 +247,12 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge
standard_callback_dynamic_params=standard_callback_dynamic_params, standard_callback_dynamic_params=standard_callback_dynamic_params,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
) )
langfuse_logger_to_use._old_log_event( langfuse_logger_to_use.log_event_on_langfuse(
kwargs=kwargs, kwargs=kwargs,
response_obj=response_obj, response_obj=response_obj,
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=None,
) )
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
@ -271,12 +270,11 @@ class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogge
) )
if standard_logging_object is None: if standard_logging_object is None:
return return
langfuse_logger_to_use._old_log_event( langfuse_logger_to_use.log_event_on_langfuse(
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
response_obj=None, response_obj=None,
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=None,
status_message=standard_logging_object["error_str"], status_message=standard_logging_object["error_str"],
level="ERROR", level="ERROR",
kwargs=kwargs, kwargs=kwargs,

View file

@ -1247,13 +1247,12 @@ class Logging(LiteLLMLoggingBaseClass):
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
) )
if langfuse_logger_to_use is not None: if langfuse_logger_to_use is not None:
_response = langfuse_logger_to_use._old_log_event( _response = langfuse_logger_to_use.log_event_on_langfuse(
kwargs=kwargs, kwargs=kwargs,
response_obj=result, response_obj=result,
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=print_verbose,
) )
if _response is not None and isinstance(_response, dict): if _response is not None and isinstance(_response, dict):
_trace_id = _response.get("trace_id", None) _trace_id = _response.get("trace_id", None)
@ -1957,12 +1956,11 @@ class Logging(LiteLLMLoggingBaseClass):
standard_callback_dynamic_params=self.standard_callback_dynamic_params, standard_callback_dynamic_params=self.standard_callback_dynamic_params,
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
) )
_response = langfuse_logger_to_use._old_log_event( _response = langfuse_logger_to_use.log_event_on_langfuse(
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
response_obj=None, response_obj=None,
user_id=kwargs.get("user", None), user_id=kwargs.get("user", None),
print_verbose=print_verbose,
status_message=str(exception), status_message=str(exception),
level="ERROR", level="ERROR",
kwargs=self.model_call_details, kwargs=self.model_call_details,

View file

@ -3699,8 +3699,9 @@ class Router:
Else, original response is returned. Else, original response is returned.
""" """
if response.choices[0].finish_reason != "content_filter": if response.choices and len(response.choices) > 0:
return False if response.choices[0].finish_reason != "content_filter":
return False
content_policy_fallbacks = kwargs.get( content_policy_fallbacks = kwargs.get(
"content_policy_fallbacks", self.content_policy_fallbacks "content_policy_fallbacks", self.content_policy_fallbacks

View file

@ -0,0 +1,75 @@
{
"batch": [
{
"id": "1f1d7517-4602-4c59-a322-7fc0306f1b7a",
"type": "trace-create",
"body": {
"id": "litellm-test-dbadfdfc-f4e7-4f05-8992-984c37359166",
"timestamp": "2025-02-07T00:23:27.669634Z",
"name": "litellm-acompletion",
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"tags": []
},
"timestamp": "2025-02-07T00:23:27.669809Z"
},
{
"id": "fbe610b6-f500-4c7d-8e34-d40a0e8c487b",
"type": "generation-create",
"body": {
"traceId": "litellm-test-dbadfdfc-f4e7-4f05-8992-984c37359166",
"name": "litellm-acompletion",
"startTime": "2025-02-06T16:23:27.220129-08:00",
"metadata": {
"hidden_params": {
"model_id": null,
"cache_key": null,
"api_base": "https://api.openai.com",
"response_cost": 3.5e-05,
"additional_headers": {},
"litellm_overhead_time_ms": null
},
"litellm_response_cost": 3.5e-05,
"cache_hit": false,
"requester_metadata": {}
},
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"level": "DEFAULT",
"id": "time-16-23-27-220129_chatcmpl-565360d7-965f-4533-9c09-db789af77a7d",
"endTime": "2025-02-06T16:23:27.644253-08:00",
"completionStartTime": "2025-02-06T16:23:27.644253-08:00",
"model": "gpt-3.5-turbo",
"modelParameters": {
"extra_body": "{}"
},
"usage": {
"input": 10,
"output": 10,
"unit": "TOKENS",
"totalCost": 3.5e-05
}
},
"timestamp": "2025-02-07T00:23:27.670175Z"
}
],
"metadata": {
"batch_size": 2,
"sdk_integration": "litellm",
"sdk_name": "python",
"sdk_version": "2.44.1",
"public_key": "pk-lf-e02aaea3-8668-4c9f-8c69-771a4ea1f5c9"
}
}

View file

@ -352,3 +352,32 @@ class TestLangfuseLogging:
response_json_file, response_json_file,
setup["trace_id"], setup["trace_id"],
) )
@pytest.mark.asyncio
async def test_langfuse_logging_completion_with_malformed_llm_response(
self, mock_setup
):
"""Test Langfuse logging for chat completion with malformed LLM response"""
setup = await mock_setup # Await the fixture
litellm._turn_on_debug()
with patch("httpx.Client.post", setup["mock_post"]):
mock_response = litellm.ModelResponse(
choices=[],
usage=litellm.Usage(
prompt_tokens=10,
completion_tokens=10,
total_tokens=20,
),
model="gpt-3.5-turbo",
object="chat.completion",
created=1723081200,
).model_dump()
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response=mock_response,
metadata={"trace_id": setup["trace_id"]},
)
await self._verify_langfuse_call(
setup["mock_post"], "completion_with_no_choices.json", setup["trace_id"]
)

View file

@ -21,6 +21,11 @@ from litellm.types.utils import (
StandardLoggingMetadata, StandardLoggingMetadata,
StandardLoggingHiddenParams, StandardLoggingHiddenParams,
StandardCallbackDynamicParams, StandardCallbackDynamicParams,
ModelResponse,
Choices,
Message,
TextCompletionResponse,
TextChoices,
) )
@ -294,7 +299,6 @@ def test_get_langfuse_tags():
assert result == [] assert result == []
@patch.dict(os.environ, {}, clear=True) # Start with empty environment @patch.dict(os.environ, {}, clear=True) # Start with empty environment
def test_get_langfuse_flush_interval(): def test_get_langfuse_flush_interval():
""" """
@ -316,6 +320,7 @@ def test_get_langfuse_flush_interval():
) )
assert result == 120 assert result == 120
def test_langfuse_e2e_sync(monkeypatch): def test_langfuse_e2e_sync(monkeypatch):
from litellm import completion from litellm import completion
import litellm import litellm
@ -343,3 +348,45 @@ def test_langfuse_e2e_sync(monkeypatch):
assert langfuse_mock.called assert langfuse_mock.called
def test_get_chat_content_for_langfuse():
"""
Test that _get_chat_content_for_langfuse correctly extracts content from chat completion responses
"""
# Test with valid response
mock_response = ModelResponse(
choices=[Choices(message=Message(role="assistant", content="Hello world"))]
)
result = LangFuseLogger._get_chat_content_for_langfuse(mock_response)
assert result == {
"content": "Hello world",
"role": "assistant",
"tool_calls": None,
"function_call": None,
}
# Test with empty choices
mock_response = ModelResponse(choices=[])
result = LangFuseLogger._get_chat_content_for_langfuse(mock_response)
assert result is None
def test_get_text_completion_content_for_langfuse():
"""
Test that _get_text_completion_content_for_langfuse correctly extracts content from text completion responses
"""
# Test with valid response
mock_response = TextCompletionResponse(choices=[TextChoices(text="Hello world")])
result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response)
assert result == "Hello world"
# Test with empty choices
mock_response = TextCompletionResponse(choices=[])
result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response)
assert result is None
# Test with no choices field
mock_response = TextCompletionResponse()
result = LangFuseLogger._get_text_completion_content_for_langfuse(mock_response)
assert result is None