javajava-streamjava-24

Why doesn't my Gatherer short-circuit the Stream if the source is an IntStream?


I'm playing around with Stream Gatherers, which were introduced (or, at least, lost preview-status) in Java 24. As an exercise, I implemented my own version of the limit intermediate operation:

public class LimitDemo {

    private static class State {
        int count;
    }

    static <T> Gatherer<T, ?, T> limit(int size) {
        Supplier<State> initializer = State::new;
        Gatherer.Integrator<State, T, T> integrator = (state, element, downstream) -> {
            if (++state.count > size) {
                return false;
            }
            return downstream.push(element);
        };
        return Gatherer.ofSequential(initializer, integrator);
    }

    public static void main(String[] args) {
        var rng = new Random();
        var result = 
            Stream.generate(() -> rng.nextInt(10)) // here
            .gather(limit(10))
            .toList();
        System.out.println(result);
    }
}

This works very well: a list of 10 random numbers is printed on the screen, as I expected. The infinite stream of random Integers is short-circuited.

However, if I generate the infinite stream in another way, it doesn't work anymore. If I replace the line marked // here with

rng.ints(0, 10).boxed()

then my program terminates with an OutOfMemoryException. Apparently, the IntStream generated by the Random instance doesn't notice that my Gatherer short-circuits. Of course, when I use the standard limit instead of my custom one, it works in both cases.

What am I missing?


Solution

  • As has been explained in the comments, this is definitely a bug. More information can be found at:

    The bug was created in response to the mailing list, which itself was created in response to this question. It was created by Viktor Klang who is the owner of JEP 485: Stream Gatherers.

    As the title of the bug explains, the problem is that the gather operation forwards the upstream size to downstream. This shouldn't happen because a gatherer has no way to report ahead-of-time how many elements it's going to push downstream, if it even knows. Luckily, this bug has already been fixed for Java 25. See this commit.

    Forwarding the size information downstream causes the stream operation in your example to try to perform some optimizations. Mainly, it tries to allocate enough space for the elements ahead-of-time to avoid having to resize some internal buffers. But you have an infinite stream and so the upstream size, as reported by the source Spliterator, is Long.MAX_VALUE (at least when the Spliterator has the SIZED characteristic). And you don't have enough memory to allocate the buffers needed to hold that number of elements.

    This means the problem isn't that the stream isn't short-circuiting. The stream operation is failing before it even starts processing the first element. It doesn't have a chance to short-circuit.

    But the gatherer bug is independent of how the stream is created. So why do you only see the OutOfMemoryError with Random::ints(int,int) and not Stream::generate(Supplier)? This has to do with the source Spliterator being used by each stream. In the former's case, the Spliterator reports itself as SIZED, whereas in the latter's case it does not. This is mentioned in the mailing list:

    However, there's an argument to be made that an unbounded Spliterator (as obtained by Random::ints(…)) should not report the SIZED and SUBSIZED characteristics, as it is indeed not sized. I'll open a separate Issue for that next week.

    That separate bug may have been created already, but I didn't see it. Regardless, I'd indeed argue an infinite Spliterator should not report itself as SIZED based on the documentation (emphasis mine):

    Characteristic value signifying that the value returned from estimateSize() prior to traversal or splitting represents a finite size that, in the absence of structural source modification, represents an exact count of the number of elements that would be encountered by a complete traversal.

    You can see that the Spliterator having the SIZED characteristic combined with the gatherer bug is what leads to the OutOfMemoryError by creating your own version of Stream::generate(Supplier). See the following example.

    import java.util.Objects;
    import java.util.Random;
    import java.util.Spliterator;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import java.util.stream.Gatherer;
    import java.util.stream.Stream;
    import java.util.stream.StreamSupport;
    
    public class Main {
    
      public static void main(String[] args) {
        var rng = new Random();
    
        System.out.println("Using Stream::generate");
        var list = Stream.generate(rng::nextInt).gather(limit(10L)).toList();
        System.out.println(list);
        System.out.println();
    
        System.out.println("Using Main::sizedGenerate");
        list = sizedGenerate(rng::nextInt).gather(limit(10L)).toList();
        System.out.println(list);
        System.out.println();
      }
    
      static <T> Gatherer<T, ?, T> limit(long limit) {
        if (limit < 0) throw new IllegalArgumentException();
    
        class State { long count; }
        Gatherer.Integrator<State, T, T> integrator =
            Gatherer.Integrator.of(
                (state, element, downstream) -> {
                  if (++state.count >= limit) return false;
                  return downstream.push(element);
                });
        return Gatherer.ofSequential(State::new, integrator);
      }
    
      static <T> Stream<T> sizedGenerate(Supplier<? extends T> supplier) {
        return StreamSupport.stream(new GeneratingSpliterator<>(supplier), false);
      }
    
      // Essentially the same as the Spliterator used by Stream::generate(Supplier), but
      // has IMMUTABLE and SIZED characteristics instead of just IMMUTABLE.
      static class GeneratingSpliterator<T> implements Spliterator<T> {
    
        private final Supplier<? extends T> supplier;
        private long estimate;
    
        GeneratingSpliterator(Supplier<? extends T> supplier) {
          this.supplier = Objects.requireNonNull(supplier);
          this.estimate = Long.MAX_VALUE;
        }
    
        private GeneratingSpliterator(Supplier<? extends T> supplier, long estimate) {
          this.supplier = supplier;
          this.estimate = estimate;
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
          action.accept(supplier.get());
          return true;
        }
    
        @Override
        public Spliterator<T> trySplit() {
          if (estimate == 0L) return null;
          return new GeneratingSpliterator<>(supplier, estimate >>>= 1);
        }
    
        @Override
        public long estimateSize() {
          return estimate;
        }
    
        @Override
        public int characteristics() {
          return IMMUTABLE | SIZED;
        }
      }
    }
    

    Output:

    Using Stream::generate
    [1181200022, 1865178467, -727320699, 1587704041, -973737573, -868155800, -851856196, 781133979, 1084548654]
    
    Using Main::sizedGenerate
    Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
            at java.base/java.util.stream.SpinedBuffer.ensureCapacity(SpinedBuffer.java:143)
            at java.base/java.util.stream.Nodes$SpinedNodeBuilder.begin(Nodes.java:1281)
            at java.base/java.util.stream.GathererOp$GatherSink.begin(GathererOp.java:153)
            at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:587)
            at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:574)
            at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560)
            at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:636)
            at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:291)
            at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:656)
            at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:662)
            at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:667)
            at com.gitlab.tkslaw.workshop.Main.main(Main.java:23)