javajava-streamguavapartitionspliterator

Spliterator generated by Iterables.partition() doesn't behave as expected?


I've noticed that the spliterator produced by using Guava's Iterables.partition(collection, partitionSize).spliterator() behaves strange.

Executing trySplit() on the resultant spliterator doesn't split, but executing trySplit() on the result of the initial trySplit() finally does.

Furthermore, using StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator(), true) does not parallelize the the stream, but StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator().trySplit(), true) does parallelize and the resultant stream contains all of the partitions.

My goal is: given a collection with size 100k I want to partition it into batches of size 5000 and process those batches in parallel.

2 questions: does the spliterator generated by Iterables.partition behave correctly? Is my approach a good way to achieve my goal?


Solution

  • The problem here is that Spliterator comes from an Iterable, that does not have a known size. So the implementation internally will buffer the elements into a buffer of size 1024 and continue to increase the buffer on next iterations. What I mean by that is :

        List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
        Iterable<List<Integer>> it = Iterables.partition(coll, 1);
        Spliterator<List<Integer>> sp = it.spliterator();
    
        Spliterator<List<Integer>> one = sp.trySplit();
        System.out.println(one.getExactSizeIfKnown());
    
        Spliterator<List<Integer>> two = sp.trySplit();
        System.out.println(two.getExactSizeIfKnown());
    
        Spliterator<List<Integer>> three = sp.trySplit();
        System.out.println(three.getExactSizeIfKnown());
    
        Spliterator<List<Integer>> four = sp.trySplit();
        System.out.println(four.getExactSizeIfKnown());
    

    which would print:

    1024
    2048
    3072
    4096
    

    If you want to process 5000 elements at a time, you need to start with a Spliterator that has a known size to begin with. You could put those partitions to an ArrayList first:

     public static void main(String[] args) {
    
        List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
        Iterable<List<Integer>> it = Iterables.partition(coll, 5000);
    
        List<List<Integer>> list = new ArrayList<>();
        it.forEach(list::add);
    
        StreamSupport.stream(list.spliterator(), true)
                .map(x -> {
                    System.out.println(
                            "Thread : " + Thread.currentThread().getName() +
                            " processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
                    );
                    return x;
                })
                .flatMap(List::stream)
                .collect(Collectors.toList());
    }
    

    On my machine it shows that they are processed by one thread each:

    Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
    Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
    Thread : main processed elements in the range : 5000 , 9999