javajava-8

Canceling a CompletableFuture chain


I have a chain of asynchronous service calls which I would like to cancel. Well, actually, I have two chains of service calls going in parallel, and if one succeeds, I would like to cancel the other.

With guava's futures I'm accustomed to canceling an entire chain of Futures by canceling the last one. It appears I cannot do this with java-8's futures. Unless someone has a good idea of how.

You're task, should you choose to accept it, is to tell me if I can keep my pretty syntax AND have the chain cancellation. Otherwise, I'll be writing my own chaining future wrapper - especially after this question.


My own tests and attempts follow.

@Test
public void shouldCancelOtherFutures() {
    // guava
    ListenableFuture<String> as = Futures.immediateFuture("a");
    ListenableFuture<String> bs = Futures.transform(as, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> cs = Futures.transform(bs, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> ds = Futures.transform(cs, Functions.<String>identity());

    ds.cancel(false);
    assertTrue(cs.isDone()); // succeeds

    // jdk 8
    CompletableFuture<String> ac = CompletableFuture.completedFuture("a");
    CompletableFuture<String> bc = ac.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> cc = bc.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> dc = cc.thenApply(Function.identity());

    dc.cancel(false);
    assertTrue(cc.isDone()); // fails
}

(Imagine that each thenCompose() and Futures.transform(x, AsyncFunction) represents an asynchronous service call.)

I can see why Doug Lee's army of grad students did it this way. With branching chains, should everything be canceled?

CompletableFuture<Z> top = new CompletableFuture<>()
    .thenApply(x -> y(x))
    .thenCompose(y -> z(y));

CompletableFuture<?> aBranch = top.thenCompose(z -> aa(z));
CompletableFuture<?> bBranch = top.thenCompose(z -> bb(z));

...
bBranch.cancel(false);
// should aBranch be canceled now?

I can get around the problem with a custom wrapper function, but it messes up the pretty syntax.

private <T,U> CompletableFuture<U> transformAsync(CompletableFuture<T> source, Function<? super T,? extends CompletableFuture<U>> transform) {
    CompletableFuture<U> next = source.thenCompose(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

private <T,U> CompletableFuture<U> transform(CompletableFuture<T> source, Function<T,U> transform) {
    CompletableFuture<U> next = source.thenApply(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

// nice syntax I wished worked
CompletableFuture<?> f1 = serviceCall()
        .thenApply(w -> x(w))
        .thenCompose(x -> serviceCall())
        .thenCompose(y -> serviceCall())
        .thenApply(z -> $(z));

// what works, with less readable syntax
CompletableFuture<?> f2 =
        transform(
            transformAsync(
                transformAsync(
                    transform(serviceCall, x(w)),
                    x -> serviceCall()),
                y -> serviceCall()),
            z -> $(z));

Solution

  • It depends on what your goal is. I assume, that it is not important to have your intermediate CompletableFutures report a completed state as you usually won’t notice when using chained construction call. The important point is that you wish your expensive serviceCall() not to be triggered.

    One solution can be:

    CompletableFuture<String> flag=new CompletableFuture<>();
    
    CompletableFuture<String> ac = serviceCall()
      .thenCompose(x -> flag.isCancelled()? flag: serviceCall())
      .thenCompose(x -> flag.isCancelled()? flag: serviceCall());
    ac.whenComplete((v,t)->flag.cancel(false));// don’t chain this call
    

    This uses the whenComplete call like in your solution but only on the final CompletableFuture to propagate the cancellation to a dedicated flag object. After calling cancel the next thenCompose invocation will detect the cancellation and return the cancelled future, thus the cancellation will propagate the chain then so no more compose or apply methods are invoked.

    The drawback is that this can not combined with thenApply as the Function can’t return a cancelled future. Therefore, when the asynchronous service call completes and is chained with a Function, the function will be applied even if a cancellation occurred.

    The alternate solution solving this issue as well is to create a wrapper function for your serviceCall which includes a test before launching and after completion:

    CompletableFuture<String> serviceCall(CompletableFuture<String> f) {
        if(f.isCancelled()) return f;
        CompletableFuture<String> serviceCall=serviceCall();
        return serviceCall.thenCompose(x->f.isCancelled()? f: serviceCall);
    }
    

    Then your use case will look like:

    CompletableFuture<String> flag=new CompletableFuture<>();
    
    CompletableFuture<String> ac = serviceCall(flag)
      .thenApply(w->x(w))
      .thenCompose(x -> serviceCall(flag))
      .thenCompose(x -> serviceCall(flag))
      .thenApply(z -> $(z));
    ac.whenComplete((v,t)->flag.cancel(false));
    

    Of course, you have to replace <String> with whatever type parameter your original serviceCall() uses for the CompletableFuture<T>.