javajava-streamjava.util.concurrent

Can thread-unsafe combiner be used in Stream.collect method in java 8?


For a parallel Stream, the collect method has a 3-arg variant: (source: oracle documentation )

/**
Parameters:
supplier - a function that creates a new result container. For a parallel execution, this function may be called multiple times and must return a fresh value each time.
accumulator - an associative, non-interfering, stateless function for incorporating an additional element into a result
combiner - an associative, non-interfering, stateless function for combining two values, which must be compatible with the accumulator function
Returns:
the result of the reduction
*/
<R> R collect(Supplier<R> supplier,
              BiConsumer<R,? super T> accumulator,
              BiConsumer<R,R> combiner)

I've seen a lot of examples in which thread-unsafe combiners are used, both in blogs online or books. For example, this post gives the example where a thread-unsafe StringBuilder is used in the combiner function:

List<String> vowels = List.of("a", "e", "i", "o", "u");

// sequential stream - nothing to combine
StringBuilder result = vowels.stream().collect(StringBuilder::new, (x, y) -> x.append(y),
        (a, b) -> a.append(",").append(b));
System.out.println(result.toString());

// parallel stream - combiner is combining partial results
StringBuilder result1 = vowels.parallelStream().collect(StringBuilder::new, (x, y) -> x.append(y),
        (a, b) -> a.append(",").append(b));
System.out.println(result1.toString());

I know that the BiConsumer<R,? super T> accumulator can be thread-unsafe, since the identity is created for each thread so that threads does not need to synchronize because they don't even share states.

However, the problem is, why thread-unsafe combiners are used? In my understanding these combiners are run parallel in order to combine the results of the accumulator. If the combiner is not run in parallel, then the whole process becomes serial, not taking advantage of concurrency. For example if the whole process is adding identity 0 with an array of 1 to N. For a parallel stream, operations of (0+1), (0+2), .... (0+N) are executed in parallel, but if the combiner is serial, the final combing step 1+2+ .... N is no faster than the serial stream where this process is directly executed by the accumulator function.


Solution

  • This answer in a similar question says that if a collector is non-concurrent, "the framework will perform the necessary steps to use it in a thread-safe but still efficient manner".

    That is, each thread creates its own identity value using supplier, uses accumulator inside its own thread, and uses combiner to combine thread values after they have finished their own work.

    Although the answer specifically answers a question asking about a user-defined Collector object passed to one-arg collect method, I think my question of 3-arg collector follows the same logic (it is just a variant). And the 3-arg collector method's parameters should be regarded non-concurrent as a whole by library designers/implementors because it is unwise to assume that user must provide thread-safe arguments.

    So it does not matter if combiner is thread-safe or not, because the combiner only accepts results of parallel executions and combines them afterwards and locally.

    In my example, the combiner 's arguments are two StringBuilder but they're just local variables, not shared, and have nothing to do with concurrency:

    (a, b) -> a.append(",").append(b)