I have utility methods look like this.
public static WebClient.ResponseSpec retrieve(final String baseUrl, final Duration responseTimeout) {
// ...
}
public static <T> Flux<T> retrieveBodyToFlux(final String baseUrl, final Duration responseTimeout,
final Class<T> elementClass) {
return retrieve(baseUrl, responseTimeout)
.bodyToFlux(elementClass);
}
public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
final Path destination, final OpenOption... openOptions) {
return DataBufferUtils.write(
retrieveBodyToFlux(baseUrl, responseTimeout, DataBuffer.class),
destination,
openOptions
);
}
Now I want to add another method for client to consume a temporary file.
// Let me download the file, and you can just cononsume the file!
public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
final Consumer<? super Path> consumer) {
// Create a temp file
// download the URL to the file
// accept the file to the consumer; possibly blocking IO operations
// don't forget to delete the file!
}
How can I do the job without any blocking call warning?
I tried (, and it seems to work), but I'm wondering what am I doing. Is it optimal? Do I have other ways?
return Mono.usingWhen(
Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
p -> download(baseUrl, responseTimeout, p, StandardOpenOption.WRITE)
.then(Mono.just(p))
.doOnNext(consumer).subscribeOn(Schedulers.boundedElastic()),
p -> Mono.fromCallable(() -> {
Files.delete(p);
return null;
}).publishOn(Schedulers.boundedElastic()).then()
)
.then();
Mono.usingWhen(
// Resource acquisition
Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
// Resource usage
tempFile -> download(baseUrl, responseTimeout, tempFile, StandardOpenOption.WRITE)
.then(Mono.fromRunnable(() -> consumer.accept(tempFile)).subscribeOn(Schedulers.boundedElastic())),
// Resource cleanup
tempFile -> Mono.fromRunnable(() -> {
try {
Files.delete(tempFile);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic())
).then();