spring-webfluxproject-reactor

How to call blocking IO calls on Mono


I have utility methods look like this.

    public static WebClient.ResponseSpec retrieve(final String baseUrl, final Duration responseTimeout) {
        // ...
    }

    public static <T> Flux<T> retrieveBodyToFlux(final String baseUrl, final Duration responseTimeout,
                                                 final Class<T> elementClass) {
        return retrieve(baseUrl, responseTimeout)
                .bodyToFlux(elementClass);
    }

    public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
                                      final Path destination, final OpenOption... openOptions) {
        return DataBufferUtils.write(
                retrieveBodyToFlux(baseUrl, responseTimeout, DataBuffer.class),
                destination,
                openOptions
        );
    }

Now I want to add another method for client to consume a temporary file.

    // Let me download the file, and you can just cononsume the file!
    public static Mono<Void> download(final String baseUrl, final Duration responseTimeout,
                                      final Consumer<? super Path> consumer) {

        // Create a temp file
        // download the URL to the file
        // accept the file to the consumer; possibly blocking IO operations
        // don't forget to delete the file!
    }

How can I do the job without any blocking call warning?

I tried (, and it seems to work), but I'm wondering what am I doing. Is it optimal? Do I have other ways?

        return Mono.usingWhen(
                        Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
                        p -> download(baseUrl, responseTimeout, p, StandardOpenOption.WRITE)
                                .then(Mono.just(p))
                                .doOnNext(consumer).subscribeOn(Schedulers.boundedElastic()),
                        p -> Mono.fromCallable(() -> {
                            Files.delete(p);
                            return null;
                        }).publishOn(Schedulers.boundedElastic()).then()
                )
                .then();

Solution

  • 
    Mono.usingWhen(
            // Resource acquisition
            Mono.fromCallable(() -> Files.createTempFile(null, null)).subscribeOn(Schedulers.boundedElastic()),
            
            // Resource usage
            tempFile -> download(baseUrl, responseTimeout, tempFile, StandardOpenOption.WRITE)
                        .then(Mono.fromRunnable(() -> consumer.accept(tempFile)).subscribeOn(Schedulers.boundedElastic())),
            
            // Resource cleanup
            tempFile -> Mono.fromRunnable(() -> {
                try {
                    Files.delete(tempFile);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).subscribeOn(Schedulers.boundedElastic())
    ).then();