javaproject-reactorfluxreactorautocloseable

How to run onClose operation after creating Flux.fromIterable?


Suppose we need to create a Flux based on contents of a Closeable resource. For clarity say there is a BufferedReader to be converted to Flux<String>.

BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));

Let us assume that iteratorOfLines produces a finite set of items.

I'm looking for a way to close BufferedReader when either the Flux has consumed all data from it or the remaining data is not needed for some reason (i.e. subscription is aborted).

There's a constructor reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose), but:

  1. seems that it is not reachable (even transitively) from public API of reactor
  2. I doubt that it can help, because it does not cover the case when Flux stops before getting the last item in the iterable.

What is the proper way of cleaning/closing resources after Flux.fromIterable publishes the last item?

Probably, there is a better way than fromIterable to do similar things, so all options are welcome.


Solution

  • For an equivalent to try with resources you can use using

        Flux.using(
                //Set up resource
                () -> createReader("my_resource_path"),
                //Create flux from resource
                reader -> Flux.fromIterable(iteratorOfLines(reader)),
                //Perform action (cleanup/close) 
                //when resource completes/errors/cancelled
                reader -> {
                    try{
                        reader.close();
                    }catch(IOException e){
                        throw Exceptions.propagate(e);
                    }
                }
        );