I have a stream of Integers, and I would like to group the indexes of the elements by each element's value.
For example, {1, 1, 1, 2, 3, 3, 4}
is grouped as Integer to list of indexes mapping:
1 -> 0, 1, 2
2 -> 3
3 -> 4, 5
4 -> 6
I have tried using Stream
, but with an additional class:
@Test
public void testGrouping() throws Exception {
// actually it is being read from a disk file
Stream<Integer> nums = Stream.of(1, 1, 1, 2, 3, 3, 4);
// list to map by index
int[] ind = {0}; // capture array, effectively final
class Pair {
int left;
int right;
public Pair(int left, int right) {
this.left = left;
this.right = right;
}
}
Map<Integer, List<Integer>> map = nums.map(e -> new Pair(ind[0]++, e))
.collect(Collectors.groupingBy(e -> e.right))
.entrySet().parallelStream()
.collect(Collectors.toConcurrentMap(
Map.Entry::getKey,
e -> e.getValue().parallelStream().map(ee -> ee.left).collect(Collectors.toList())
));
}
I have to read Stream
since the Stream<Integer>
is read from a disk file in my application.
I feel my way of doing it as above is pretty sub-optimal. Is there is a better or more elegant way to do it?
With a little helper method for collecting:
class MapAndIndex {
Map<Integer,List<Integer>> map=new HashMap<>();
int index;
void add(int value) {
map.computeIfAbsent(value, x->new ArrayList<>()).add(index++);
}
void merge(MapAndIndex other) {
other.map.forEach((value,list) -> {
List<Integer> l=map.computeIfAbsent(value, x->new ArrayList<>());
for(int i: list) l.add(i+index);
} );
index+=other.index;
}
}
the entire operation becomes:
Map<Integer,List<Integer>> map = IntStream.of(1, 1, 1, 2, 3, 3, 4)
.parallel()
.collect(MapAndIndex::new, MapAndIndex::add, MapAndIndex::merge).map;
When you need to track the indices which are unknown beforehand, you need mutable state and hence the operation called “mutable reduction”.
Note that you don’t need a ConcurrentMap
here. The Stream
implementation will already handle the concurrency. It will create one MapAndIndex
container for each involved thread and invoke the merge
operation on two containers once both associated threads are done with their work. This will also done in a way retaining the order, if the Stream
has an order, like in this example (otherwise your task of recording indices makes no sense…).