Suppose we need to create a Flux
based on contents of a Closeable
resource.
For clarity say there is a BufferedReader
to be converted to Flux<String>
.
BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));
Let us assume that iteratorOfLines
produces a finite set of items.
I'm looking for a way to close BufferedReader
when either the Flux
has consumed all data from it or the remaining data is not needed for some reason (i.e. subscription is aborted).
There's a constructor reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)
, but:
What is the proper way of cleaning/closing resources after Flux.fromIterable
publishes the last item?
Probably, there is a better way than fromIterable
to do similar things, so all options are welcome.
For an equivalent to try with resources you can use using
Flux.using(
//Set up resource
() -> createReader("my_resource_path"),
//Create flux from resource
reader -> Flux.fromIterable(iteratorOfLines(reader)),
//Perform action (cleanup/close)
//when resource completes/errors/cancelled
reader -> {
try{
reader.close();
}catch(IOException e){
throw Exceptions.propagate(e);
}
}
);