javaproject-reactorflux

How do you force a Flux to pull or implement backpressure limits?


I'm attempting to open a file and reading lines through a Flux. I would then like X number of workers to process lines concurrently. Those workers may be slow and so I would like to prevent the ENTIRE file from being read into buffers and using up memory while waiting. The Flux.parallel seems to force push processing and I can't figure out how to slow down the reading of the file.

Below is what I have so far.

Flux.using(() -> Files.lines(path), Flux::fromStream, stream -> {
  System.out.println("Close");
  stream.close();
})
.doOnNext(item -> System.out.println("Read " + item))
.onBackpressureBuffer(3)
.parallel()
.runOn(Schedulers.newParallel("z", 2))
.map(item -> {
  System.out.println("Process " + item);
  try { Thread.sleep(500); } catch (InterruptedException e) {}
  return item;
})
.sequential()
.publishOn(Schedulers.single())
.map(item -> {
  System.out.println("Write " + item);
  return item;
})
.blockLast();

Solution

  • Looks like the solution is to set the prefetch values on the parallel and runOn methods.

    Flux.using(() -> Files.lines(path), Flux::fromStream, stream -> {
      System.out.println("Close");
      stream.close();
    })
    .doOnNext(item -> System.out.println("Read " + item))
    .parallel(2, 4)
    .runOn(Schedulers.newParallel("z", 2), 1)
    .map(item -> {
      System.out.println("Process " + item);
      try { Thread.sleep(500); } catch (InterruptedException e) {}
      return item;
    })
    .sequential()
    .publishOn(Schedulers.single())
    .map(item -> {
      System.out.println("Write " + item);
      return item;
    })
    .blockLast();