javafunctional-programmingjava-8java-stream

Partition a Java 8 Stream


How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

partition

public static <T> UnmodifiableIterator<List<T>> partition(Iterator<T> iterator,
                                                          int size)

Divides an iterator into unmodifiable sublists of the given size (the final list may be smaller). For example, partitioning an iterator containing [a, b, c, d, e] with a partition size of 3 yields [[a, b, c], [d, e]] -- an outer iterator containing two inner lists of three and two elements, all in the original order.


Solution

  • It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

    However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

    List<Type> input = Arrays.asList(...);
    
    Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
    

    Or if you really want the stream of streams:

    Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
    

    If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

    public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
        if (length <= 0)
            throw new IllegalArgumentException("length = " + length);
        int size = source.size();
        if (size <= 0)
            return Stream.empty();
        int fullChunks = (size - 1) / length;
        return IntStream.range(0, fullChunks + 1).mapToObj(
            n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
    }
    

    This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


    If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

    public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                       Collector<List<T>, A, R> downstream) {
        class Acc {
            List<T> cur = new ArrayList<>();
            A acc = downstream.supplier().get();
        }
        BiConsumer<Acc, T> accumulator = (acc, t) -> {
            acc.cur.add(t);
            if(acc.cur.size() == batchSize) {
                downstream.accumulator().accept(acc.acc, acc.cur);
                acc.cur = new ArrayList<>();
            }
        };
        return Collector.of(Acc::new, accumulator,
                (acc1, acc2) -> {
                    acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                    for(T t : acc2.cur) accumulator.accept(acc1, t);
                    return acc1;
                }, acc -> {
                    if(!acc.cur.isEmpty())
                        downstream.accumulator().accept(acc.acc, acc.cur);
                    return downstream.finisher().apply(acc.acc);
                }, Collector.Characteristics.UNORDERED);
    }
    

    Usage example:

    List<List<Integer>> list = IntStream.range(0,20)
                                        .boxed().parallel()
                                        .collect(unorderedBatches(3, Collectors.toList()));
    

    Result:

    [[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
    

    Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

    If you want to apply an intermediate transformation for every batch, you may use the following version:

    public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
            Collector<T, AA, B> batchCollector,
            Collector<B, A, R> downstream) {
        return unorderedBatches(batchSize, 
                Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
    }
    

    For example, this way you can sum the numbers in every batch on the fly:

    List<Integer> list = IntStream.range(0,20)
            .boxed().parallel()
            .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
                Collectors.toList()));