javajava-streamcompletable-futurespliterator

CompletableFuture stream created from iterator is not lazily evaluated


I'm struggling a bit with how and when completable futures are completed. I have created this test case:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

And the output is:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

The testList method works as expected. The CompletableFuture's are only evaluated at the very end, so after the limit method has only kept the first two items.

However, the testIterator method is unexpected. All CompletableFuture's are completed and the limiting is only done afterwards.

If I remove the parallel() method from the stream it works as expected. However, the processing (the forEach()) should be done in parallel because in my full program it is a long-running method.

Can any one explain why this is happening?

It looks like this depends on the Java version, so I'm on 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

Solution

  • Parallelism applies to the whole pipeline, so you cannot really control what will be executed before the limit() is applied in a parallel Stream. The only guarantee is that what's after the limit() will only be executed on the retained elements.

    The difference between the two is probably due to some implementation details or other Stream characteristics. In fact, you can easily invert the behavior by playing on the SIZED characteristic. It seems when the Stream has a known size, only 2 elements are processed.

    So for example, applying a simple filter() will lose the size of the list version:

    completeFirstTwoElements(
            Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
    );
    

    outputs for example:

    Running list one
    Running list five
    Running list two
    Running list three
    list one
    list two
    

    And not using the unknown size version of Spliterator.spliterator() "fixes" the behavior:

    Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
    
    completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
    );
    

    Output:

    Running iterator two
    Running iterator one
    iterator one
    iterator two