asynchronousexceptionjava-8completion-stage

Java CompletionStage not propagating exception to exceptionally


I am using async programming in one of my project. In one of the method where the return type is CompletionStage, for testing I am simulating an exception throw which does not get caught in the exceptionally(). Below are some Test code i created based on the use case.

public class TestMain {
  
  TestService service = new TestService();
  
  public static void main(String[] args) throws InterruptedException {
    TestMain t = new TestMain();
    System.out.println("STARTED: THREAD " + Thread.currentThread().getName());
    t.execute("test").toCompletableFuture().join();
    System.out.println("DONE: THREAD " + Thread.currentThread().getName());
  }
  
  public CompletionStage<Void> execute(String s) {
    System.out.println("EXECUTE: THREAD " + Thread.currentThread().getName());
    return CompletableFuture.runAsync(() -> {
      System.out.println("EXECUTE-ASYNC STARTED: THREAD " + Thread.currentThread().getName());
      process(s);}).exceptionally(ex->{
        System.out.println("EXECUTE-ASYNC EXCEPTIONAL: THREAD " + Thread.currentThread().getName());
        System.err.println("EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION " + ex.toString());
        if (ex.getCause()!=null)
          System.err.println("EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION CAUSE " + ex.getCause().toString());
        throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
      });
  }
  
  private CompletionStage<Void> process(String s) {
    System.out.println("EXECUTE-PROCESS: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return service.service(s);
  }

}

public class TestService {
  
  public CompletionStage<Void> service(String s) {
    System.out.println("SERVICE: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return preProcess(s).thenCompose(v -> {
      System.out.println("SERVICE-PRE PROCESS -> COMPOSE: THREAD " + Thread.currentThread().getName());
      return process(s);
    });
  }
  
  private CompletionStage<Void> preProcess(String s) {
    System.out.println("SERVICE-PRE PROCESS: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return CompletableFuture.completedFuture(null);
  }

  private CompletionStage<Void> process(String s) {
    System.out.println("SERVICE-PROCESS: THREAD " + Thread.currentThread().getName());
    throw new RuntimeException();
//    return CompletableFuture.completedFuture(null);
  }
  
}

When i run the code given above the output shows that the exceptionally() in TestMain.execute() is not called.

Actual Output (when exception thrown from TestService.process()):

STARTED: THREAD main
EXECUTE: THREAD main
EXECUTE-ASYNC STARTED: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS -> COMPOSE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
DONE: THREAD main

However if I throw the exception at any other place, which are marked by the commented code, the exceptionally() is called. Below is an example of one such instance.

Actual Output (when exception thrown from TestService.preProcess()): Was expecting above output to be similar to this.

STARTED: THREAD main
EXECUTE: THREAD main
EXECUTE-ASYNC STARTED: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-ASYNC EXCEPTIONAL: THREAD main
EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION java.util.concurrent.CompletionException: java.lang.RuntimeException
EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION CAUSE java.lang.RuntimeException
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643)
    at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1632)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.RuntimeException
    at in.TestService.preProcess(TestService.java:19)
    at in.TestService.service(TestService.java:11)
    at in.TestMain.process(TestMain.java:34)
    at in.TestMain.lambda$0(TestMain.java:22)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    ... 5 more

Can somebody help out as to why this is happening? I need the exception thrown to be caught in the TestMain.execute().exceptionally() always to implement a retry logic that I want to use here.

Thanks in advance.


Solution

  • You are invoking a method which returns a CompletionStage but ignoring the returned value:

    CompletableFuture.runAsync(() -> {
      System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
      process(s); // return value ignored
    })
    

    The stage created by CompletableFuture.runAsync(…) would be completed exceptionally if the function/Runnable threw an exception. Which is what happens when you place throw new RuntimeException(); inside the process method. But in the other scenario it doesn’t throw an exception but returns a failed stage which is ignored by the caller.

    You could change the code to

    CompletableFuture.runAsync(() -> {
      System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
      process(s).toCompletableFuture().join();
    })
    

    to not ignore the returned stage or, preferably, use the pattern you’ve already used at another place:

    CompletableFuture.completedFuture(null).thenComposeAsync(v -> {
      System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
      return process(s);
    })