I'm attempting to open a file and reading lines through a Flux. I would then like X
number of workers to process lines concurrently. Those workers may be slow and so I would like to prevent the ENTIRE file from being read into buffers and using up memory while waiting. The Flux.parallel
seems to force push processing and I can't figure out how to slow down the reading of the file.
Below is what I have so far.
Flux.using(() -> Files.lines(path), Flux::fromStream, stream -> {
System.out.println("Close");
stream.close();
})
.doOnNext(item -> System.out.println("Read " + item))
.onBackpressureBuffer(3)
.parallel()
.runOn(Schedulers.newParallel("z", 2))
.map(item -> {
System.out.println("Process " + item);
try { Thread.sleep(500); } catch (InterruptedException e) {}
return item;
})
.sequential()
.publishOn(Schedulers.single())
.map(item -> {
System.out.println("Write " + item);
return item;
})
.blockLast();
Looks like the solution is to set the prefetch
values on the parallel
and runOn
methods.
Flux.using(() -> Files.lines(path), Flux::fromStream, stream -> {
System.out.println("Close");
stream.close();
})
.doOnNext(item -> System.out.println("Read " + item))
.parallel(2, 4)
.runOn(Schedulers.newParallel("z", 2), 1)
.map(item -> {
System.out.println("Process " + item);
try { Thread.sleep(500); } catch (InterruptedException e) {}
return item;
})
.sequential()
.publishOn(Schedulers.single())
.map(item -> {
System.out.println("Write " + item);
return item;
})
.blockLast();