javajava-streamspliterator

Why Hashmap.values().parallelStream() does not run in parallel while wrap them in ArrayList could work?


The hashmap has two key and value pairs, they are not processed in parallel by different threads.


import java.util.stream.Stream;
import java.util.Map;
import java.util.HashMap;

class Ideone
{
    public static void main (String[] args) throws java.lang.Exception
    {
        Map<String, Integer> map = new HashMap<>();
        map.put("a", 1);
        map.put("b", 2);
        map.values().parallelStream()
              .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
              .forEach(System.out::println);
    }
}

Output:

processing 1 in Thread[main,5,main]
1
processing 2 in Thread[main,5,main]
2

URL: https://ideone.com/Hkxkoz

The ValueSpliterator should have tried to split the arrays of HashMap into slot of size 1, which means two elements should be processed in different threads.

Source: https://www.codota.com/code/java/methods/java8.util.HMSpliterators$ValueSpliterator/%3Cinit%3E

After wrapped them in ArrayList, it works as expected.

        new ArrayList(map.values()).parallelStream()
              .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
              .forEach(System.out::println);

output:

processing 1 in Thread[ForkJoinPool.commonPool-worker-3,5,main]
1
processing 2 in Thread[main,5,main]
2

Solution

  • As explained in this answer, the issue is connected with the fact that the HashMap has a capacity potentially larger than its size and the actual values are distributed over the backing array based on their hash codes.

    The splitting logic is basically the same for all array based spliterators, whether you stream over an array, an ArrayList, or a HashMap. To get balanced splits on a best-effort basis, each split will half the (index) range, but in case of HashMap, the number of actual elements within the range differs from the range size.

    In principle, every range based spliterator can split down to single elements, however, the client code, i.e. the Stream API implementation, might not split so far. The decision for even attempting to split is driven by the expected number of elements and number of CPU cores.

    Taking the following program

    public static void main(String[] args) {
        Map<String, Integer> map = new HashMap<>();
        map.put("a", 1);
        map.put("b", 2);
    
        for(int depth: new int[] { 1, 2, Integer.MAX_VALUE }) {
            System.out.println("With max depth: "+depth);
            Tree<Spliterator<Map.Entry<String, Integer>>> spTree
                = split(map.entrySet().spliterator(), depth);
            Tree<String> valueTree = spTree.map(sp -> "estimated: "+sp.estimateSize()+" "
                +StreamSupport.stream(sp, false).collect(Collectors.toList()));
            System.out.println(valueTree);
        }
    }
    
    private static <T> Tree<Spliterator<T>> split(Spliterator<T> sp, int depth) {
        Spliterator<T> prefix = depth-- > 0? sp.trySplit(): null;
        return prefix == null?
            new Tree<>(sp): new Tree<>(null, split(prefix, depth), split(sp, depth));
    }
    
    public static class Tree<T> {
        final T value;
        List<Tree<T>> children;
    
        public Tree(T value) {
            this.value = value;
            children = Collections.emptyList();
        }
        public Tree(T value, Tree<T>... ch) {
            this.value = value;
            children = Arrays.asList(ch);
        }
        public <U> Tree<U> map(Function<? super T, ? extends U> f) {
            Tree<U> t = new Tree<>(value == null? null: f.apply(value));
            if(!children.isEmpty()) {
                t.children = new ArrayList<>(children.size());
                for(Tree<T> ch: children) t.children.add(ch.map(f));
            }
            return t;
        }
        public @Override String toString() {
            if(children.isEmpty()) return value == null? "": value.toString();
            final StringBuilder sb = new StringBuilder(100);
            toString(sb, 0, 0);
            return sb.toString();
        }
        public void toString(StringBuilder sb, int preS, int preEnd) {
            final int myHandle = sb.length() - 2;
            sb.append(value == null? "": value).append('\n');
            final int num = children.size() - 1;
            if (num >= 0) {
                if (num != 0) {
                    for (int ix = 0; ix < num; ix++) {
                        int nPreS = sb.length();
                        sb.append(sb, preS, preEnd);
                        sb.append("\u2502 ");
                        int nPreE = sb.length();
                        children.get(ix).toString(sb, nPreS, nPreE);
                    }
                }
                int nPreS = sb.length();
                sb.append(sb, preS, preEnd);
                final int lastItemHandle = sb.length();
                sb.append("  ");
                int nPreE = sb.length();
                children.get(num).toString(sb, nPreS, nPreE);
                sb.setCharAt(lastItemHandle, '\u2514');
            }
            if (myHandle > 0) {
                sb.setCharAt(myHandle, '\u251c');
                sb.setCharAt(myHandle + 1, '\u2500');
            }
        }
    }
    

    you will get:

    With max depth: 1
    
    ├─estimated: 1 [a=1, b=2]
    └─estimated: 1 []
    
    With max depth: 2
    
    ├─
    │ ├─estimated: 0 [a=1, b=2]
    │ └─estimated: 0 []
    └─
      ├─estimated: 0 []
      └─estimated: 0 []
    
    With max depth: 2147483647
    
    ├─
    │ ├─
    │ │ ├─
    │ │ │ ├─estimated: 0 []
    │ │ │ └─estimated: 0 [a=1]
    │ │ └─
    │ │   ├─estimated: 0 [b=2]
    │ │   └─estimated: 0 []
    │ └─
    │   ├─
    │   │ ├─estimated: 0 []
    │   │ └─estimated: 0 []
    │   └─
    │     ├─estimated: 0 []
    │     └─estimated: 0 []
    └─
      ├─
      │ ├─
      │ │ ├─estimated: 0 []
      │ │ └─estimated: 0 []
      │ └─
      │   ├─estimated: 0 []
      │   └─estimated: 0 []
      └─
        ├─
        │ ├─estimated: 0 []
        │ └─estimated: 0 []
        └─
          ├─estimated: 0 []
          └─estimated: 0 []
    

    On ideone

    So, as said, the spliterator can split down to individual elements if we split deep enough, however, the estimated size of two elements does not suggest that it’s worth doing that. On each split, it will halve the estimate and while you might say that it’s wrong for the elements you’re interested in, it’s actually correct for most spliterators here, as when going down to the maximum level, most spliterators are representing an empty range and splitting them turns out to be a waste of resources.

    As said in the other answer, the decision is about balancing the work of splitting (or preparation in general) and the expected work to parallelize, which the Stream implementation can’t know in advance. If you know in advance that the per-element workload will be very high, to justify more preparation work, you can use, e.g. new ArrayList<>(map.[keySet|entrySet|values]()) .parallelStream() to enforce balanced splits. Usually, the problem will be much smaller for larger maps anyway.