I have some large-ish text files that I want to process by grouping its lines.
I tried to use the new streaming features, like
return FileUtils.readLines(...)
.parallelStream()
.map(...)
.collect(groupingBy(pair -> pair[0]));
The problem is that, AFAIK, this generates a Map.
Is there any way to have high level code like the one above that generates, for example, a Stream of Entries?
I'm looking for something like python's itertools.groupby. My files are already sorted (by pair[0]
); I just want to load the groups one by one.
I already have an iterative solution. I'm just wondering if there's a more declarative way to do that. Using Guava or another 3rd party library wouldn't be a big problem.
The task you want to achieve is quite different from what grouping does. groupingBy
does not rely on the order of the Stream
’s elements but on the Map
’s algorithm applied to the classifier Function
’s result.
What you want is to fold adjacent items having a common property value into one List
item. It is not even necessary to have the Stream
sorted by that property as long as you can guaranty that all items having the same property value are clustered.
Maybe it is possible to formulate this task as a reduction but to me the resulting structure looks too complicated.
So, unless direct support for this feature gets added to the Stream
s, an iterator based approach looks most pragmatic to me:
class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> {
static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
Stream<? extends T> s, Function<? super T, ? extends G> f) {
return StreamSupport.stream(new Folding<>(s.spliterator(), f), false);
}
private final Spliterator<? extends T> source;
private final Function<? super T, ? extends G> pf;
private final Consumer<T> c=this::addItem;
private List<T> pending, result;
private G pendingGroup, resultGroup;
Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) {
source=s;
pf=f;
}
private void addItem(T item) {
G group=pf.apply(item);
if(pending==null) pending=new ArrayList<>();
else if(!pending.isEmpty()) {
if(!Objects.equals(group, pendingGroup)) {
if(pending.size()==1)
result=Collections.singletonList(pending.remove(0));
else {
result=pending;
pending=new ArrayList<>();
}
resultGroup=pendingGroup;
}
}
pendingGroup=group;
pending.add(item);
}
public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) {
while(source.tryAdvance(c)) {
if(result!=null) {
action.accept(entry(resultGroup, result));
result=null;
return true;
}
}
if(pending!=null) {
action.accept(entry(pendingGroup, pending));
pending=null;
return true;
}
return false;
}
private Map.Entry<G,List<T>> entry(G g, List<T> l) {
return new AbstractMap.SimpleImmutableEntry<>(g, l);
}
public int characteristics() { return 0; }
public long estimateSize() { return Long.MAX_VALUE; }
public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; }
}
The lazy nature of the resulting folded Stream
can be best demonstrated by applying it to an infinite stream:
Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4)
.filter(e -> e.getKey()>5)
.findFirst().ifPresent(e -> System.out.println(e.getValue()));