Litellm dev 04 22 2025 p1 (#10206)

* fix(openai.py): initial commit adding generic event type for openai responses api streaming

Ensures handling for undocumented event types - e.g. "response.reasoning_summary_part.added"

* fix(transformation.py): handle unknown openai response type

* fix(datadog_llm_observability.py): handle dict[str, any] -> dict[str, str] conversion

Fixes https://github.com/BerriAI/litellm/issues/9494

* test: add more unit testing

* test: add unit test

* fix(common_utils.py): fix message with content list

* test: update testing
This commit is contained in:
Krish Dholakia 2025-04-22 23:58:43 -07:00 committed by Christian Owusu
parent 47765404a8
commit d4ef43f34c
12 changed files with 165 additions and 10 deletions

View file

@ -13,10 +13,15 @@ import uuid
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import httpx
import litellm import litellm
from litellm._logging import verbose_logger from litellm._logging import verbose_logger
from litellm.integrations.custom_batch_logger import CustomBatchLogger from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.integrations.datadog.datadog import DataDogLogger from litellm.integrations.datadog.datadog import DataDogLogger
from litellm.litellm_core_utils.prompt_templates.common_utils import (
handle_any_messages_to_chat_completion_str_messages_conversion,
)
from litellm.llms.custom_httpx.http_handler import ( from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client, get_async_httpx_client,
httpxSpecialProvider, httpxSpecialProvider,
@ -106,7 +111,6 @@ class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
}, },
) )
response.raise_for_status()
if response.status_code != 202: if response.status_code != 202:
raise Exception( raise Exception(
f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}"
@ -116,6 +120,10 @@ class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}"
) )
self.log_queue.clear() self.log_queue.clear()
except httpx.HTTPStatusError as e:
verbose_logger.exception(
f"DataDogLLMObs: Error sending batch - {e.response.text}"
)
except Exception as e: except Exception as e:
verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}") verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}")
@ -133,7 +141,11 @@ class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
metadata = kwargs.get("litellm_params", {}).get("metadata", {}) metadata = kwargs.get("litellm_params", {}).get("metadata", {})
input_meta = InputMeta(messages=messages) # type: ignore input_meta = InputMeta(
messages=handle_any_messages_to_chat_completion_str_messages_conversion(
messages
)
)
output_meta = OutputMeta(messages=self._get_response_messages(response_obj)) output_meta = OutputMeta(messages=self._get_response_messages(response_obj))
meta = Meta( meta = Meta(

View file

@ -6,7 +6,7 @@ import io
import mimetypes import mimetypes
import re import re
from os import PathLike from os import PathLike
from typing import Dict, List, Literal, Mapping, Optional, Union, cast from typing import Any, Dict, List, Literal, Mapping, Optional, Union, cast
from litellm.types.llms.openai import ( from litellm.types.llms.openai import (
AllMessageValues, AllMessageValues,
@ -32,6 +32,35 @@ DEFAULT_ASSISTANT_CONTINUE_MESSAGE = ChatCompletionAssistantMessage(
) )
def handle_any_messages_to_chat_completion_str_messages_conversion(
messages: Any,
) -> List[Dict[str, str]]:
"""
Handles any messages to chat completion str messages conversion
Relevant Issue: https://github.com/BerriAI/litellm/issues/9494
"""
import json
if isinstance(messages, list):
try:
return cast(
List[Dict[str, str]],
handle_messages_with_content_list_to_str_conversion(messages),
)
except Exception:
return [{"input": json.dumps(message, default=str)} for message in messages]
elif isinstance(messages, dict):
try:
return [{"input": json.dumps(messages, default=str)}]
except Exception:
return [{"input": str(messages)}]
elif isinstance(messages, str):
return [{"input": messages}]
else:
return [{"input": str(messages)}]
def handle_messages_with_content_list_to_str_conversion( def handle_messages_with_content_list_to_str_conversion(
messages: List[AllMessageValues], messages: List[AllMessageValues],
) -> List[AllMessageValues]: ) -> List[AllMessageValues]:

View file

@ -187,7 +187,7 @@ class OpenAIResponsesAPIConfig(BaseResponsesAPIConfig):
model_class = event_models.get(cast(ResponsesAPIStreamEvents, event_type)) model_class = event_models.get(cast(ResponsesAPIStreamEvents, event_type))
if not model_class: if not model_class:
raise ValueError(f"Unknown event type: {event_type}") return GenericEvent
return model_class return model_class

View file

@ -33,7 +33,7 @@ model_list:
litellm_settings: litellm_settings:
num_retries: 0 num_retries: 0
callbacks: ["prometheus"] callbacks: ["datadog_llm_observability"]
check_provider_endpoint: true check_provider_endpoint: true
files_settings: files_settings:

View file

@ -1296,7 +1296,7 @@ class ProxyConfig:
config=config, base_dir=os.path.dirname(os.path.abspath(file_path or "")) config=config, base_dir=os.path.dirname(os.path.abspath(file_path or ""))
) )
verbose_proxy_logger.debug(f"loaded config={json.dumps(config, indent=4)}") # verbose_proxy_logger.debug(f"loaded config={json.dumps(config, indent=4)}")
return config return config
def _process_includes(self, config: dict, base_dir: str) -> dict: def _process_includes(self, config: dict, base_dir: str) -> dict:

View file

@ -44,12 +44,12 @@ class BaseResponsesAPIStreamingIterator:
self.responses_api_provider_config = responses_api_provider_config self.responses_api_provider_config = responses_api_provider_config
self.completed_response: Optional[ResponsesAPIStreamingResponse] = None self.completed_response: Optional[ResponsesAPIStreamingResponse] = None
self.start_time = datetime.now() self.start_time = datetime.now()
# set request kwargs # set request kwargs
self.litellm_metadata = litellm_metadata self.litellm_metadata = litellm_metadata
self.custom_llm_provider = custom_llm_provider self.custom_llm_provider = custom_llm_provider
def _process_chunk(self, chunk): def _process_chunk(self, chunk) -> Optional[ResponsesAPIStreamingResponse]:
"""Process a single chunk of data from the stream""" """Process a single chunk of data from the stream"""
if not chunk: if not chunk:
return None return None

View file

@ -8,7 +8,9 @@ from typing import Any, Dict, List, Literal, Optional, TypedDict
class InputMeta(TypedDict): class InputMeta(TypedDict):
messages: List[Any] messages: List[
Dict[str, str]
] # Relevant Issue: https://github.com/BerriAI/litellm/issues/9494
class OutputMeta(TypedDict): class OutputMeta(TypedDict):

View file

@ -50,7 +50,7 @@ from openai.types.responses.response_create_params import (
ToolParam, ToolParam,
) )
from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall
from pydantic import BaseModel, Discriminator, Field, PrivateAttr from pydantic import BaseModel, ConfigDict, Discriminator, Field, PrivateAttr
from typing_extensions import Annotated, Dict, Required, TypedDict, override from typing_extensions import Annotated, Dict, Required, TypedDict, override
from litellm.types.llms.base import BaseLiteLLMOpenAIResponseObject from litellm.types.llms.base import BaseLiteLLMOpenAIResponseObject
@ -1013,6 +1013,9 @@ class ResponsesAPIStreamEvents(str, Enum):
RESPONSE_FAILED = "response.failed" RESPONSE_FAILED = "response.failed"
RESPONSE_INCOMPLETE = "response.incomplete" RESPONSE_INCOMPLETE = "response.incomplete"
# Part added
RESPONSE_PART_ADDED = "response.reasoning_summary_part.added"
# Output item events # Output item events
OUTPUT_ITEM_ADDED = "response.output_item.added" OUTPUT_ITEM_ADDED = "response.output_item.added"
OUTPUT_ITEM_DONE = "response.output_item.done" OUTPUT_ITEM_DONE = "response.output_item.done"
@ -1200,6 +1203,12 @@ class ErrorEvent(BaseLiteLLMOpenAIResponseObject):
param: Optional[str] param: Optional[str]
class GenericEvent(BaseLiteLLMOpenAIResponseObject):
type: str
model_config = ConfigDict(extra="allow", protected_namespaces=())
# Union type for all possible streaming responses # Union type for all possible streaming responses
ResponsesAPIStreamingResponse = Annotated[ ResponsesAPIStreamingResponse = Annotated[
Union[ Union[
@ -1226,6 +1235,7 @@ ResponsesAPIStreamingResponse = Annotated[
WebSearchCallSearchingEvent, WebSearchCallSearchingEvent,
WebSearchCallCompletedEvent, WebSearchCallCompletedEvent,
ErrorEvent, ErrorEvent,
GenericEvent,
], ],
Discriminator("type"), Discriminator("type"),
] ]

View file

@ -11,6 +11,7 @@ sys.path.insert(
from litellm.litellm_core_utils.prompt_templates.common_utils import ( from litellm.litellm_core_utils.prompt_templates.common_utils import (
get_format_from_file_id, get_format_from_file_id,
handle_any_messages_to_chat_completion_str_messages_conversion,
update_messages_with_model_file_ids, update_messages_with_model_file_ids,
) )
@ -64,3 +65,63 @@ def test_update_messages_with_model_file_ids():
], ],
} }
] ]
def test_handle_any_messages_to_chat_completion_str_messages_conversion_list():
# Test with list of messages
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
result = handle_any_messages_to_chat_completion_str_messages_conversion(messages)
assert len(result) == 2
assert result[0] == messages[0]
assert result[1] == messages[1]
def test_handle_any_messages_to_chat_completion_str_messages_conversion_list_infinite_loop():
# Test that list handling doesn't cause infinite recursion
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
# This should complete without stack overflow
result = handle_any_messages_to_chat_completion_str_messages_conversion(messages)
assert len(result) == 2
assert result[0] == messages[0]
assert result[1] == messages[1]
def test_handle_any_messages_to_chat_completion_str_messages_conversion_dict():
# Test with single dictionary message
message = {"role": "user", "content": "Hello"}
result = handle_any_messages_to_chat_completion_str_messages_conversion(message)
assert len(result) == 1
assert result[0]["input"] == json.dumps(message)
def test_handle_any_messages_to_chat_completion_str_messages_conversion_str():
# Test with string message
message = "Hello"
result = handle_any_messages_to_chat_completion_str_messages_conversion(message)
assert len(result) == 1
assert result[0]["input"] == message
def test_handle_any_messages_to_chat_completion_str_messages_conversion_other():
# Test with non-string/dict/list type
message = 123
result = handle_any_messages_to_chat_completion_str_messages_conversion(message)
assert len(result) == 1
assert result[0]["input"] == "123"
def test_handle_any_messages_to_chat_completion_str_messages_conversion_complex():
# Test with complex nested structure
message = {
"role": "user",
"content": {"text": "Hello", "metadata": {"timestamp": "2024-01-01"}},
}
result = handle_any_messages_to_chat_completion_str_messages_conversion(message)
assert len(result) == 1
assert result[0]["input"] == json.dumps(message)

View file

@ -252,3 +252,22 @@ class TestOpenAIResponsesAPIConfig:
) )
assert result == "https://custom-openai.example.com/v1/responses" assert result == "https://custom-openai.example.com/v1/responses"
def test_get_event_model_class_generic_event(self):
"""Test that get_event_model_class returns the correct event model class"""
from litellm.types.llms.openai import GenericEvent
event_type = "test"
result = self.config.get_event_model_class(event_type)
assert result == GenericEvent
def test_transform_streaming_response_generic_event(self):
"""Test that transform_streaming_response returns the correct event model class"""
from litellm.types.llms.openai import GenericEvent
chunk = {"type": "test", "test": "test"}
result = self.config.transform_streaming_response(
model=self.model, parsed_chunk=chunk, logging_obj=self.logging_obj
)
assert isinstance(result, GenericEvent)
assert result.type == "test"

View file

@ -0,0 +1,21 @@
import asyncio
import os
import sys
from typing import Optional
from unittest.mock import AsyncMock, patch
import pytest
sys.path.insert(0, os.path.abspath("../../.."))
import json
import litellm
def test_generic_event():
from litellm.types.llms.openai import GenericEvent
event = {"type": "test", "test": "test"}
event = GenericEvent(**event)
assert event.type == "test"
assert event.test == "test"

View file

@ -470,3 +470,4 @@ class TestOpenAIGPT4OAudioTranscription(BaseLLMAudioTranscriptionTest):
def get_custom_llm_provider(self) -> litellm.LlmProviders: def get_custom_llm_provider(self) -> litellm.LlmProviders:
return litellm.LlmProviders.OPENAI return litellm.LlmProviders.OPENAI