fix sagemaker streaming error

This commit is contained in:
Ishaan Jaff 2025-03-24 21:29:29 -07:00
parent 80f201ff15
commit 12639b7ccf

View file

@ -128,21 +128,25 @@ class AWSEventStreamDecoder:
async for chunk in iterator: async for chunk in iterator:
event_stream_buffer.add_data(chunk) event_stream_buffer.add_data(chunk)
for event in event_stream_buffer: for event in event_stream_buffer:
message = self._parse_message_from_event(event) try:
if message: message = self._parse_message_from_event(event)
verbose_logger.debug("sagemaker parsed chunk bytes %s", message) if message:
# remove data: prefix and "\n\n" at the end verbose_logger.debug(
message = ( "sagemaker parsed chunk bytes %s", message
litellm.CustomStreamWrapper._strip_sse_data_from_chunk(message) )
or "" # remove data: prefix and "\n\n" at the end
) message = (
message = message.replace("\n\n", "") litellm.CustomStreamWrapper._strip_sse_data_from_chunk(
message
)
or ""
)
message = message.replace("\n\n", "")
# Accumulate JSON data # Accumulate JSON data
accumulated_json += message accumulated_json += message
# Try to parse the accumulated JSON # Try to parse the accumulated JSON
try:
_data = json.loads(accumulated_json) _data = json.loads(accumulated_json)
if self.is_messages_api: if self.is_messages_api:
yield self._chunk_parser_messages_api(chunk_data=_data) yield self._chunk_parser_messages_api(chunk_data=_data)
@ -150,9 +154,19 @@ class AWSEventStreamDecoder:
yield self._chunk_parser(chunk_data=_data) yield self._chunk_parser(chunk_data=_data)
# Reset accumulated_json after successful parsing # Reset accumulated_json after successful parsing
accumulated_json = "" accumulated_json = ""
except json.JSONDecodeError: except json.JSONDecodeError:
# If it's not valid JSON yet, continue to the next event # If it's not valid JSON yet, continue to the next event
continue continue
except UnicodeDecodeError as e:
verbose_logger.warning(
f"UnicodeDecodeError: {e}. Attempting to combine with next event."
)
continue
except Exception as e:
verbose_logger.error(
f"Error parsing message: {e}. Attempting to combine with next event."
)
continue
# Handle any remaining data after the iterator is exhausted # Handle any remaining data after the iterator is exhausted
if accumulated_json: if accumulated_json:
@ -168,6 +182,8 @@ class AWSEventStreamDecoder:
f"Warning: Unparseable JSON data remained: {accumulated_json}" f"Warning: Unparseable JSON data remained: {accumulated_json}"
) )
yield None yield None
except Exception as e:
verbose_logger.error(f"Final error parsing accumulated JSON: {e}")
def _parse_message_from_event(self, event) -> Optional[str]: def _parse_message_from_event(self, event) -> Optional[str]:
response_dict = event.to_response_dict() response_dict = event.to_response_dict()