javadictionaryjava-8java-stream

Grouping indexes of a Java 8 Stream<Integer> by the Integer values?


I have a stream of Integers, and I would like to group the indexes of the elements by each element's value.

For example, {1, 1, 1, 2, 3, 3, 4} is grouped as Integer to list of indexes mapping:

1 -> 0, 1, 2
2 -> 3
3 -> 4, 5
4 -> 6

I have tried using Stream, but with an additional class:

@Test
public void testGrouping() throws Exception {
    // actually it is being read from a disk file
    Stream<Integer> nums = Stream.of(1, 1, 1, 2, 3, 3, 4);  
    // list to map by index
    int[] ind = {0};  // capture array, effectively final
    class Pair {
        int left;
        int right;

        public Pair(int left, int right) {
            this.left = left;
            this.right = right;
        }
    }

    Map<Integer, List<Integer>> map = nums.map(e -> new Pair(ind[0]++, e))
            .collect(Collectors.groupingBy(e -> e.right))
            .entrySet().parallelStream()
            .collect(Collectors.toConcurrentMap(
                    Map.Entry::getKey,
                    e -> e.getValue().parallelStream().map(ee -> ee.left).collect(Collectors.toList())
            ));
}

I have to read Stream since the Stream<Integer> is read from a disk file in my application.

I feel my way of doing it as above is pretty sub-optimal. Is there is a better or more elegant way to do it?


Solution

  • With a little helper method for collecting:

    class MapAndIndex {
        Map<Integer,List<Integer>> map=new HashMap<>();
        int index;
    
        void add(int value) {
            map.computeIfAbsent(value, x->new ArrayList<>()).add(index++);
        }
        void merge(MapAndIndex other) {
            other.map.forEach((value,list) -> {
                List<Integer> l=map.computeIfAbsent(value, x->new ArrayList<>());
                for(int i: list) l.add(i+index);
            } );
            index+=other.index;
        }
    }
    

    the entire operation becomes:

    Map<Integer,List<Integer>> map = IntStream.of(1, 1, 1, 2, 3, 3, 4)
        .parallel()
        .collect(MapAndIndex::new, MapAndIndex::add, MapAndIndex::merge).map;
    

    When you need to track the indices which are unknown beforehand, you need mutable state and hence the operation called “mutable reduction”.

    Note that you don’t need a ConcurrentMap here. The Stream implementation will already handle the concurrency. It will create one MapAndIndex container for each involved thread and invoke the merge operation on two containers once both associated threads are done with their work. This will also done in a way retaining the order, if the Stream has an order, like in this example (otherwise your task of recording indices makes no sense…).