I'm exploring Java Stream Gatherers, a preview feature introduced in Java 22, and I'm using Java 23 with preview features enabled for this purpose. My primary goal here is to better understand how the Gatherer API works, rather than just solving the "distinct by product code" problem.
In my code, I expect to gather three distinct Offer objects based on their productCode, but I consistently get only 2. Debugging reveals that the gatherer always omits the last item in the stream. When I increase the number of offers to 4, the result size becomes 3, but it still skips the last item.
I’m not sure if I'm misunderstanding how the Gatherer API is supposed to work, or if there might be a bug in the JDK implementation of this preview feature.
Here's the relevant code:
package com.mycompany.app;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Gatherer;
public class AppTest {
@Test
void exploreStreamGatherer() {
Offer grandChildOffer = new Offer("GP1", List.of());
Offer grandChildOffer2 = new Offer("GP1", List.of());
Offer grandChildOffer3 = new Offer("GP2", List.of());
Offer grandChildOffer4 = new Offer("GP3", List.of());
Offer childOffer1 = new Offer("CP1", List.of(grandChildOffer, grandChildOffer4));
Offer childOffer2 = new Offer("CP2", List.of(grandChildOffer2, grandChildOffer3));
Offer offer = new Offer("P1", List.of(childOffer1, childOffer2));
List<Offer> distinctGrandChildOffers = getAllDistinctGrandChildOffersByProductCode(offer);
assertEquals(3, distinctGrandChildOffers.size()); // Fails, returns 2
}
static List<Offer> getAllDistinctGrandChildOffersByProductCode(Offer offer) {
return offer.childOffers().stream()
.flatMap(childOffer -> childOffer.childOffers().stream())
.gather(distinctByProductCode())
.toList();
}
public static Gatherer<Offer, List<Offer>, Offer> distinctByProductCode() {
return Gatherer.ofSequential(
ArrayList::new,
(state, element, downstream) -> {
if (hasProductWithSameProductCode(state, element)) {
return false;
}
state.add(element);
return true;
},
(state, downstream) -> {
if (!state.isEmpty()) {
state.forEach(downstream::push);
}
}
);
}
private static boolean hasProductWithSameProductCode(List<Offer> state, Offer element) {
return state.stream().anyMatch(offer -> offer.productCode().equals(element.productCode()));
}
}
record Offer(String productCode, List<Offer> childOffers) {
}
For full context, the project source code is available here: GitHub Repository.
Main Focus:
The goal is to explore and understand how the Gatherer API works in Java 22/23, not just to solve the "distinct by product code" logic.
When I expect 3 items, the gatherer returns only 2. If I increase the number of offers to 4, I get 3 items, but the last one is still omitted.
Questions:
The false
returned by:
if (hasProductWithSameProductCode(state, element)) {
return false;
}
stops any more items being processed from the stream, so you are skipping everything from then on. You should be returning true
to continue integrating the remaining stream elements.
Since you want to process all the stream elements you can use a Greedy
integrator which allows the gatherer to do a bit of optimization:
public static Gatherer<Offer, List<Offer>, Offer> distinctByProductCode() {
return Gatherer.ofSequential(
ArrayList::new,
Gatherer.Integrator.<List<Offer>, Offer, Offer>ofGreedy((state, element, downstream) -> {
if (!hasProductWithSameProductCode(state, element)) {
state.add(element);
}
return true;
}),
(state, downstream) -> {
if (!state.isEmpty()) {
state.forEach(downstream::push);
}
}
);
}