diff --git a/litellm/proxy/pass_through_endpoints/streaming_handler.py b/litellm/proxy/pass_through_endpoints/streaming_handler.py index 9ba5adfec..a88ad34d3 100644 --- a/litellm/proxy/pass_through_endpoints/streaming_handler.py +++ b/litellm/proxy/pass_through_endpoints/streaming_handler.py @@ -40,35 +40,39 @@ async def chunk_processor( - Yields chunks from the response - Collect non-empty chunks for post-processing (logging) """ - collected_chunks: List[str] = [] # List to store all chunks try: - async for chunk in response.aiter_lines(): - verbose_proxy_logger.debug(f"Processing chunk: {chunk}") - if not chunk: - continue + if endpoint_type == EndpointType.VERTEX_AI: + async for chunk in response.aiter_bytes(): + yield chunk + else: + collected_chunks: List[str] = [] # List to store all chunks + async for chunk in response.aiter_lines(): + verbose_proxy_logger.debug(f"Processing chunk: {chunk}") + if not chunk: + continue - # Handle SSE format - pass through the raw SSE format - if isinstance(chunk, bytes): - chunk = chunk.decode("utf-8") + # Handle SSE format - pass through the raw SSE format + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8") - # Store the chunk for post-processing - if chunk.strip(): # Only store non-empty chunks - collected_chunks.append(chunk) - yield f"{chunk}\n" + # Store the chunk for post-processing + if chunk.strip(): # Only store non-empty chunks + collected_chunks.append(chunk) + yield f"{chunk}\n" - # After all chunks are processed, handle post-processing - end_time = datetime.now() + # After all chunks are processed, handle post-processing + end_time = datetime.now() - await _route_streaming_logging_to_handler( - litellm_logging_obj=litellm_logging_obj, - passthrough_success_handler_obj=passthrough_success_handler_obj, - url_route=url_route, - request_body=request_body or {}, - endpoint_type=endpoint_type, - start_time=start_time, - all_chunks=collected_chunks, - end_time=end_time, - ) + await _route_streaming_logging_to_handler( + litellm_logging_obj=litellm_logging_obj, + passthrough_success_handler_obj=passthrough_success_handler_obj, + url_route=url_route, + request_body=request_body or {}, + endpoint_type=endpoint_type, + start_time=start_time, + all_chunks=collected_chunks, + end_time=end_time, + ) except Exception as e: verbose_proxy_logger.error(f"Error in chunk_processor: {str(e)}")