javajava-8java-streamspliterator

Can you rebalance an unbalanced Spliterator of unknown size?


I want to use a Stream to parallelize processing of a heterogenous set of remotely stored JSON files of unknown number (the number of files is not known upfront). The files can vary widely in size, from 1 JSON record per file up to 100,000 records in some other files. A JSON record in this case means a self-contained JSON object represented as one line in the file.

I really want to use Streams for this and so I implemented this Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

The problem I'm having is that while the Stream parallelizes beautifully at first, eventually the largest file is left processing in a single thread. I believe the proximal cause is well documented: the spliterator is "unbalanced".

More concretely, appears that the trySplit method is not called after a certain point in the Stream.forEach's lifecycle, so the extra logic to distribute small batches at the end of trySplit is rarely executed.

Notice how all the spliterators returned from trySplit share the same paths iterator. I thought this was a really clever way to balance the work across all spliterators, but it hasn't been enough to achieve full parallelism.

I would like the parallel processing to proceed first across files, and then when few large files are still left spliterating, I want to parallelize across chunks of the remaining files. That was the intent of the else block at the end of trySplit.

Is there an easy / simple / canonical way around this problem?


Solution

  • After much experimentation, I was still not able to get any added parallelism by playing with the size estimates. Basically, any value other than Long.MAX_VALUE will tend to cause the spliterator to terminate too early (and without any splitting), while on the other hand a Long.MAX_VALUE estimate will cause trySplit to be called relentlessly until it returns null.

    The solution I found is to internally share resources among the spliterators and let them rebalance amongst themselves.

    Working code:

    public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {
    
        public final static class AwsS3LineInput<LINE> {
            final public S3ObjectSummary s3ObjectSummary;
            final public LINE lineItem;
            public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
                this.s3ObjectSummary = s3ObjectSummary;
                this.lineItem = lineItem;
            }
        }
    
        private final class InputStreamHandler {
            final S3ObjectSummary file;
            final InputStream inputStream;
            InputStreamHandler(S3ObjectSummary file, InputStream is) {
                this.file = file;
                this.inputStream = is;
            }
        }
    
        private final Iterator<S3ObjectSummary> incomingFiles;
    
        private final Function<S3ObjectSummary, InputStream> fileOpener;
    
        private final Function<InputStream, LINE> lineReader;
    
        private final Deque<S3ObjectSummary> unopenedFiles;
    
        private final Deque<InputStreamHandler> openedFiles;
    
        private final Deque<AwsS3LineInput<LINE>> sharedBuffer;
    
        private final int maxBuffer;
    
        private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
                Function<InputStream, LINE> lineReader,
                Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
                int maxBuffer) {
            super(Long.MAX_VALUE, 0);
            this.incomingFiles = incomingFiles;
            this.fileOpener = fileOpener;
            this.lineReader = lineReader;
            this.unopenedFiles = unopenedFiles;
            this.openedFiles = openedFiles;
            this.sharedBuffer = sharedBuffer;
            this.maxBuffer = maxBuffer;
        }
    
        public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
            this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
            AwsS3LineInput<LINE> lineInput;
            synchronized(sharedBuffer) {
                lineInput=sharedBuffer.poll();
            }
            if(lineInput != null) {
                action.accept(lineInput);
                return true;
            }
            InputStreamHandler handle = openedFiles.poll();
            if(handle == null) {
                S3ObjectSummary unopenedFile = unopenedFiles.poll();
                if(unopenedFile == null) {
                    return false;
                }
                handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
            }
            for(int i=0; i < maxBuffer; ++i) {
                LINE line = lineReader.apply(handle.inputStream);
                if(line != null) {
                    synchronized(sharedBuffer) {
                        sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                    }
                }
                else {
                    return tryAdvance(action);
                }
            }
            openedFiles.addFirst(handle);
            return tryAdvance(action);
        }
    
        @Override
        public Spliterator<AwsS3LineInput<LINE>> trySplit() {
            synchronized(incomingFiles) {
                if (incomingFiles.hasNext()) {
                    unopenedFiles.add(incomingFiles.next());
                    return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
                } else {
                    return null;
                }
            }
        }
    }