I'm playing around with Stream Gatherers, which were introduced (or, at least, lost preview-status) in Java 24. As an exercise, I implemented my own version of the limit
intermediate operation:
public class LimitDemo {
private static class State {
int count;
}
static <T> Gatherer<T, ?, T> limit(int size) {
Supplier<State> initializer = State::new;
Gatherer.Integrator<State, T, T> integrator = (state, element, downstream) -> {
if (++state.count > size) {
return false;
}
return downstream.push(element);
};
return Gatherer.ofSequential(initializer, integrator);
}
public static void main(String[] args) {
var rng = new Random();
var result =
Stream.generate(() -> rng.nextInt(10)) // here
.gather(limit(10))
.toList();
System.out.println(result);
}
}
This works very well: a list of 10 random numbers is printed on the screen, as I expected. The infinite stream of random Integers is short-circuited.
However, if I generate the infinite stream in another way, it doesn't work anymore. If I replace the line marked // here
with
rng.ints(0, 10).boxed()
then my program terminates with an OutOfMemoryException. Apparently, the IntStream generated by the Random
instance doesn't notice that my Gatherer short-circuits. Of course, when I use the standard limit
instead of my custom one, it works in both cases.
What am I missing?
As has been explained in the comments, this is definitely a bug. More information can be found at:
The bug was created in response to the mailing list, which itself was created in response to this question. It was created by Viktor Klang who is the owner of JEP 485: Stream Gatherers.
As the title of the bug explains, the problem is that the gather operation forwards the upstream size to downstream. This shouldn't happen because a gatherer has no way to report ahead-of-time how many elements it's going to push downstream, if it even knows. Luckily, this bug has already been fixed for Java 25. See this commit.
Forwarding the size information downstream causes the stream operation in your example to
try to perform some optimizations. Mainly, it tries to allocate enough space
for the elements ahead-of-time to avoid having to resize some internal buffers.
But you have an infinite stream and so the upstream size, as reported by the
source Spliterator
, is Long.MAX_VALUE
(at least when the Spliterator
has
the SIZED
characteristic). And you don't have enough memory to allocate the
buffers needed to hold that number of elements.
This means the problem isn't that the stream isn't short-circuiting. The stream operation is failing before it even starts processing the first element. It doesn't have a chance to short-circuit.
But the gatherer bug is independent of how the stream is created. So why do you
only see the OutOfMemoryError
with Random::ints(int,int)
and not
Stream::generate(Supplier)
? This has to do with the source Spliterator
being used by each stream. In the former's case, the Spliterator
reports
itself as SIZED
, whereas in the latter's case it does not. This is mentioned
in the mailing list:
However, there's an argument to be made that an unbounded Spliterator (as obtained by Random::ints(…)) should not report the SIZED and SUBSIZED characteristics, as it is indeed not sized. I'll open a separate Issue for that next week.
That separate bug may have been created already, but I didn't see it.
Regardless, I'd indeed argue an infinite Spliterator
should not report itself
as SIZED
based on the documentation (emphasis mine):
Characteristic value signifying that the value returned from
estimateSize()
prior to traversal or splitting represents a finite size that, in the absence of structural source modification, represents an exact count of the number of elements that would be encountered by a complete traversal.
You can see that the Spliterator
having the SIZED
characteristic combined
with the gatherer bug is what leads to the OutOfMemoryError
by creating
your own version of Stream::generate(Supplier)
. See the following example.
import java.util.Objects;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main {
public static void main(String[] args) {
var rng = new Random();
System.out.println("Using Stream::generate");
var list = Stream.generate(rng::nextInt).gather(limit(10L)).toList();
System.out.println(list);
System.out.println();
System.out.println("Using Main::sizedGenerate");
list = sizedGenerate(rng::nextInt).gather(limit(10L)).toList();
System.out.println(list);
System.out.println();
}
static <T> Gatherer<T, ?, T> limit(long limit) {
if (limit < 0) throw new IllegalArgumentException();
class State { long count; }
Gatherer.Integrator<State, T, T> integrator =
Gatherer.Integrator.of(
(state, element, downstream) -> {
if (++state.count >= limit) return false;
return downstream.push(element);
});
return Gatherer.ofSequential(State::new, integrator);
}
static <T> Stream<T> sizedGenerate(Supplier<? extends T> supplier) {
return StreamSupport.stream(new GeneratingSpliterator<>(supplier), false);
}
// Essentially the same as the Spliterator used by Stream::generate(Supplier), but
// has IMMUTABLE and SIZED characteristics instead of just IMMUTABLE.
static class GeneratingSpliterator<T> implements Spliterator<T> {
private final Supplier<? extends T> supplier;
private long estimate;
GeneratingSpliterator(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
this.estimate = Long.MAX_VALUE;
}
private GeneratingSpliterator(Supplier<? extends T> supplier, long estimate) {
this.supplier = supplier;
this.estimate = estimate;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
action.accept(supplier.get());
return true;
}
@Override
public Spliterator<T> trySplit() {
if (estimate == 0L) return null;
return new GeneratingSpliterator<>(supplier, estimate >>>= 1);
}
@Override
public long estimateSize() {
return estimate;
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED;
}
}
}
Output:
Using Stream::generate
[1181200022, 1865178467, -727320699, 1587704041, -973737573, -868155800, -851856196, 781133979, 1084548654]
Using Main::sizedGenerate
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.stream.SpinedBuffer.ensureCapacity(SpinedBuffer.java:143)
at java.base/java.util.stream.Nodes$SpinedNodeBuilder.begin(Nodes.java:1281)
at java.base/java.util.stream.GathererOp$GatherSink.begin(GathererOp.java:153)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:587)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:574)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:636)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:291)
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:656)
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:662)
at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:667)
at com.gitlab.tkslaw.workshop.Main.main(Main.java:23)