litellm-mirror/tests/litellm/llms/sagemaker/test_sagemaker_common_utils.py
2025-03-31 16:05:10 -07:00

97 lines
3.3 KiB
Python

import json
import os
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
sys.path.insert(0, os.path.abspath("../../../../.."))
from litellm.llms.sagemaker.common_utils import AWSEventStreamDecoder
@pytest.mark.asyncio
async def test_aiter_bytes_unicode_decode_error():
"""
Test that AWSEventStreamDecoder.aiter_bytes() does not raise an error when encountering invalid UTF-8 bytes. (UnicodeDecodeError)
Ensures stream processing continues despite the error.
Relevant issue: https://github.com/BerriAI/litellm/issues/9165
"""
# Create an instance of AWSEventStreamDecoder
decoder = AWSEventStreamDecoder(model="test-model")
# Create a mock event that will trigger a UnicodeDecodeError
mock_event = MagicMock()
mock_event.to_response_dict.return_value = {
"status_code": 200,
"headers": {},
"body": b"\xff\xfe", # Invalid UTF-8 bytes
}
# Create a mock EventStreamBuffer that yields our mock event
mock_buffer = MagicMock()
mock_buffer.__iter__.return_value = [mock_event]
# Mock the EventStreamBuffer class
with patch("botocore.eventstream.EventStreamBuffer", return_value=mock_buffer):
# Create an async generator that yields some test bytes
async def mock_iterator():
yield b""
# Process the stream
chunks = []
async for chunk in decoder.aiter_bytes(mock_iterator()):
if chunk is not None:
print("chunk=", chunk)
chunks.append(chunk)
# Verify that processing continued despite the error
# The chunks list should be empty since we only sent invalid data
assert len(chunks) == 0
@pytest.mark.asyncio
async def test_aiter_bytes_valid_chunk_followed_by_unicode_error():
"""
Test that valid chunks are processed correctly even when followed by Unicode decode errors.
This ensures errors don't corrupt or prevent processing of valid data that came before.
Relevant issue: https://github.com/BerriAI/litellm/issues/9165
"""
decoder = AWSEventStreamDecoder(model="test-model")
# Create two mock events - first valid, then invalid
mock_valid_event = MagicMock()
mock_valid_event.to_response_dict.return_value = {
"status_code": 200,
"headers": {},
"body": json.dumps({"token": {"text": "hello"}}).encode(), # Valid data first
}
mock_invalid_event = MagicMock()
mock_invalid_event.to_response_dict.return_value = {
"status_code": 200,
"headers": {},
"body": b"\xff\xfe", # Invalid UTF-8 bytes second
}
# Create a mock EventStreamBuffer that yields valid event first, then invalid
mock_buffer = MagicMock()
mock_buffer.__iter__.return_value = [mock_valid_event, mock_invalid_event]
with patch("botocore.eventstream.EventStreamBuffer", return_value=mock_buffer):
async def mock_iterator():
yield b"test_bytes"
chunks = []
async for chunk in decoder.aiter_bytes(mock_iterator()):
if chunk is not None:
chunks.append(chunk)
# Verify we got our valid chunk despite the subsequent error
assert len(chunks) == 1
assert chunks[0]["text"] == "hello" # Verify the content of the valid chunk