diff --git a/litellm/llms/sagemaker/common_utils.py b/litellm/llms/sagemaker/common_utils.py index 9884f420c3..48cddf8064 100644 --- a/litellm/llms/sagemaker/common_utils.py +++ b/litellm/llms/sagemaker/common_utils.py @@ -128,21 +128,25 @@ class AWSEventStreamDecoder: async for chunk in iterator: event_stream_buffer.add_data(chunk) for event in event_stream_buffer: - message = self._parse_message_from_event(event) - if message: - verbose_logger.debug("sagemaker parsed chunk bytes %s", message) - # remove data: prefix and "\n\n" at the end - message = ( - litellm.CustomStreamWrapper._strip_sse_data_from_chunk(message) - or "" - ) - message = message.replace("\n\n", "") + try: + message = self._parse_message_from_event(event) + if message: + verbose_logger.debug( + "sagemaker parsed chunk bytes %s", message + ) + # remove data: prefix and "\n\n" at the end + message = ( + litellm.CustomStreamWrapper._strip_sse_data_from_chunk( + message + ) + or "" + ) + message = message.replace("\n\n", "") - # Accumulate JSON data - accumulated_json += message + # Accumulate JSON data + accumulated_json += message - # Try to parse the accumulated JSON - try: + # Try to parse the accumulated JSON _data = json.loads(accumulated_json) if self.is_messages_api: yield self._chunk_parser_messages_api(chunk_data=_data) @@ -150,9 +154,19 @@ class AWSEventStreamDecoder: yield self._chunk_parser(chunk_data=_data) # Reset accumulated_json after successful parsing accumulated_json = "" - except json.JSONDecodeError: - # If it's not valid JSON yet, continue to the next event - continue + except json.JSONDecodeError: + # If it's not valid JSON yet, continue to the next event + continue + except UnicodeDecodeError as e: + verbose_logger.warning( + f"UnicodeDecodeError: {e}. Attempting to combine with next event." + ) + continue + except Exception as e: + verbose_logger.error( + f"Error parsing message: {e}. Attempting to combine with next event." + ) + continue # Handle any remaining data after the iterator is exhausted if accumulated_json: @@ -168,6 +182,8 @@ class AWSEventStreamDecoder: f"Warning: Unparseable JSON data remained: {accumulated_json}" ) yield None + except Exception as e: + verbose_logger.error(f"Final error parsing accumulated JSON: {e}") def _parse_message_from_event(self, event) -> Optional[str]: response_dict = event.to_response_dict()