javamultithreadingasynchronouscompletable-futurecompletion-stage

How to force CompletableFuture.thenApply() to run on the same thread that ran the previous stage?


Here's a short code version of the problem I'm facing:

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                /*
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ignored) {}
                */
                //System.out.println("supplyAsync: " + Thread.currentThread().getName());
                return 1;
            })
            .thenApply(i -> {
                System.out.println("apply: " + Thread.currentThread().getName());
                return i + 1;
            })
            .thenAccept((i) -> {
                System.out.println("accept: " + Thread.currentThread().getName());
                System.out.println("result: " + i);
            }).join();
}

This is the output that I get:

apply: main
accept: main
result: 2

I'm surprised to see main there! I expected something like this which happens when I uncomment the Thread.sleep() call or even as much as uncomment the single sysout statement there:

supplyAsync: ForkJoinPool.commonPool-worker-1
apply: ForkJoinPool.commonPool-worker-1
accept: ForkJoinPool.commonPool-worker-1
result: 2

I understand thenApplyAsync() will make sure it won't run on the main thread, but I want to avoid passing the data returned by the supplier from the thread that ran supplyAsync to the thread that's going to run thenApply and the other subsequent thens in the chain.


Solution

  • The method thenApply evaluates the function in the caller’s thread because the future has been completed already. Of course, when you insert a sleep into the supplier, the future has not been completed by the time, thenApply is called. Even a print statement might slow down the supplier enough to have the main thread invoke thenApply and thenAccept first. But this is not reliable behavior, you may get different results when running the code repeatedly.

    Not only does the future not remember which thread completed it, there is no way to tell an arbitrary thread to execute a particular code. The thread might be busy with something else, being entirely uncooperative, or even have terminated in the meanwhile.

    Just consider

    ExecutorService s = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync: " + Thread.currentThread().getName());
        return 1;
    }, s);
    s.shutdown();
    s.awaitTermination(1, TimeUnit.DAYS);
    cf.thenApply(i -> {
        System.out.println("apply: " + Thread.currentThread().getName());
        return i + 1;
    })
    .thenAccept((i) -> {
        System.out.println("accept: " + Thread.currentThread().getName());
        System.out.println("result: " + i);
    }).join();
    

    How could we expect the functions passed to thenApply and thenAccept to be executed in the already terminated pool’s worker thread?

    We could also write

    CompletableFuture<Integer> cf = new CompletableFuture<>();
    
    Thread t = new Thread(() -> {
        System.out.println("completing: " + Thread.currentThread().getName());
        cf.complete(1);
    });
    t.start();
    t.join();
    
    System.out.println("completer: " + t.getName() + " " + t.getState());
    cf.thenApply(i -> {
        System.out.println("apply: " + Thread.currentThread().getName());
        return i + 1;
    })
    .thenAccept((i) -> {
        System.out.println("accept: " + Thread.currentThread().getName());
        System.out.println("result: " + i);
    }).join();
    

    which will print something alike

    completing: Thread-0
    completer: Thread-0 TERMINATED
    apply: main
    accept: main
    result: 2
    

    Obviously, we can’t insist on this thread processing the subsequent stages.

    But even when the thread is a still alive worker thread of a pool, it doesn’t know that it has completed a future nor has it a notion of “processing subsequent stages”. Following the Executor abstraction, it just has received an arbitrary Runnable from the queue and after processing it, it proceeds with its main loop, fetching the next Runnable from the queue.

    So once the first future has been completed, the only way to tell it to do the work of completing other futures, is by enqueuing the tasks. This is what happens when using thenApplyAsync specifying the same pool or performing all actions with the …Async methods without an executor, i.e. using the default pool.

    When you use a single threaded executor for all …Async methods, you can be sure that all actions are executed by the same thread, but they will still get through the pool’s queue. Since even then, it’s the main thread actually enqueuing the dependent actions in case of an already completed future, a thread safe queue and hence, synchronization overhead, is unavoidable.

    But note that even if you manage to create the chain of dependent actions first, before a single worker thread processes them all sequentially, this overhead is still there. Each future’s completion is done by storing the new state in a thread safe way, making the result potentially visible to all other threads, and atomically checking whether a concurrent completion (e.g. a cancelation) has happened in the meanwhile. Then, the dependent action(s) chained by other threads will be fetched, of course, in a thread safe way, before they are executed.

    All these actions with synchronization semantics make it unlikely that there are benefits of processing the data by the same thread when having a chain of dependent CompletableFutures.

    The only way to have an actual local processing potentially with performance benefits is by using

    CompletableFuture.runAsync(() -> {
        System.out.println("supplyAsync: " + Thread.currentThread().getName());
        int i = 1;
    
        System.out.println("apply: " + Thread.currentThread().getName());
        i = i + 1;
    
        System.out.println("accept: " + Thread.currentThread().getName());
        System.out.println("result: " + i);
    }).join();
    

    Or, in other words, if you don’t want detached processing, don’t create detached processing stages in the first place.