I'm struggling a bit with how and when completable futures are completed. I have created this test case:
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamOfCompletableFuturesTest {
@Test
public void testList() {
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five")
);
}
@Test
public void testIterator() {
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
);
}
private void completeFirstTwoElements(Stream<String> stream) {
stream
.map(this::cf)
.limit(2)
.parallel()
.forEach(cf -> {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private CompletableFuture<String> cf(String result) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Running " + result);
return result;
});
}
}
And the output is:
Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one
The testList
method works as expected. The CompletableFuture
's are only evaluated at the very end, so after the limit method has only kept the first two items.
However, the testIterator
method is unexpected. All CompletableFuture
's are completed and the limiting is only done afterwards.
If I remove the parallel()
method from the stream it works as expected. However, the processing (the forEach()
) should be done in parallel because in my full program it is a long-running method.
Can any one explain why this is happening?
It looks like this depends on the Java version, so I'm on 1.8:
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
Parallelism applies to the whole pipeline, so you cannot really control what will be executed before the limit()
is applied in a parallel Stream
. The only guarantee is that what's after the limit()
will only be executed on the retained elements.
The difference between the two is probably due to some implementation details or other Stream
characteristics. In fact, you can easily invert the behavior by playing on the SIZED
characteristic. It seems when the Stream
has a known size, only 2 elements are processed.
So for example, applying a simple filter()
will lose the size of the list version:
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);
outputs for example:
Running list one
Running list five
Running list two
Running list three
list one
list two
And not using the unknown size version of Spliterator.spliterator()
"fixes" the behavior:
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);
Output:
Running iterator two
Running iterator one
iterator one
iterator two