I'm working on a greenfield reactive project where a lot of file handling IO is going on. Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler? Will the boundedElastic pool size limit the number of concurrent operations?
If this is not the correct method, can you show an example how to write bytes to a file using Reactor?
Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler?
This comes down to opinion on some level - but no, certainly not ideal not for a reactive greenfield project IMHO. boundedElastic()
schedulers are great for interfacing with blocking IO when you must, but they're not a good replacement when a true non-blocking solution exists. (Sometimes this is a bit of a moot point with file handling, since it depends if it's possible for the underlying system to do it asynchronously - but usually that's possible these days.)
In your case, I'd look at wrapping AsynchronousFileChannel
in a reactive publisher. You'll need to use create()
or push()
for this and then make explicit calls to the sink
, but exactly how you do this depends on your use case. As a "simplest case" for file writing, you could feasibly do something like:
static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
return Mono.create(sink -> {
byte[] bytes = content.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
sink.success();
}
@Override
public void failed(Throwable exc, Object attachment) {
sink.error(exc);
}
});
});
}
A more thorough / comprehensive example of bridging the two APIs can be found here - there's almost certainly others around also.