springasynchronousspring-aop

Spring AOP: How to rethrow an exception in the async method with void return type


I have the following application (the same application with Gradle + Spring Boot is here https://www.dropbox.com/s/vizr5joyhixmdca/demo.zip?dl=0):

Writer.java contains some code that is run asynchronously with the help of @Async annotation. One method returns void and another returns Future. Both variants are allowed as per documentation.

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;

@Component
@Async("customExecutor")
public class Writer {

    public void write() {
        System.out.println("Writing something");
        throw new RuntimeException("Writer exception");
    }

    public Future<Void> writeFuture() {
        System.out.println("Writing something with future");
        throw new RuntimeException("Writer exception with future");
    }
}

ErrorHandlingThreadPoolExecutor.java is a custom executor. The only difference from ThreadPoolExecutor is its error handling. afterExecute implementation is exactly the same as suggested in the method's javadoc. So the idea here is to print "[ERROR] " + ex when exception happens.

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component("customExecutor")
public class ErrorHandlingThreadPoolExecutor extends ThreadPoolExecutor {

    public ErrorHandlingThreadPoolExecutor() {
        super(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            handleError(t);
        }
    }

    private void handleError(Throwable ex) {
        System.out.println("[ERROR] " + ex);
    }
}

Config.java enables async processing + scheduling. It also invokes writer.write by schedule.

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
@EnableAsync
public class Config {

    private final Writer writer;

    public Config(Writer writer) {
        this.writer = writer;
    }

    @Scheduled(fixedRate = 1000)
    public void writeBySchedule() {
        writer.write();
//        writer.writeFuture();
    }
}

When I run this application, I see the following output:

Writing something
2020-07-14 21:16:33.791 ERROR 19860 --- [pool-1-thread-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.example.demo.Writer.write()

java.lang.RuntimeException: Writer exception
    at com.example.demo.Writer.write(Writer.java:14) ~[main/:na]
    at com.example.demo.Writer$$FastClassBySpringCGLIB$$cd00988d.invoke(<generated>) ~[main/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_242]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_242]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_242]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
...

At the same time if I comment writer.write() and uncomment writer.writeFuture(), I get the following:

Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception with future
...

The latter is what I'm trying to achieve with ErrorHandlingThreadPoolExecutor. However I'd like to keep my methods return void. I found that the reason why my exception doesn't reach custom ErrorHandlingThreadPoolExecutor.handleError() method is here: https://github.com/spring-projects/spring-framework/blob/master/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java#L308. This method is executed before my custom one and there seems to be no way to rethrow an exception for void methods. I'm aware of AsyncConfigurerSupport class that allows to customize exception handling, but the exception still won't escape from AsyncExecutionAspectSupport.handleError().

To sum up, is there any way for my exceptions to be propagated from asynchronously executed methods to ErrorHandlingThreadPoolExecutor.handleError() if they declare void as the return type? For now it seems like I could use executors directly without @Async, but is it possible with @Async? If not, what could be the less "invasive" fix (less code to change and maintain)? I have quite a lot of async methods returning void.

UPDATE: Based on the accepted answer, I came up with the following aspect:

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;

@Component
@Aspect
public class ErrorHandlingAspect implements ApplicationListener<ContextRefreshedEvent> {

    public static final String DEFAULT_EXECUTOR_BEAN_NAME = "defaultExecutor";

    private Map<String, ErrorHandlingThreadPoolExecutor> errorHandlingExecutors;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // initializing here because not all beans come if initialized in constructor
        this.errorHandlingExecutors = event.getApplicationContext()
                .getBeansOfType(ErrorHandlingThreadPoolExecutor.class);
    }

    @Pointcut(
            // where @Async is on class level
            "@within(org.springframework.scheduling.annotation.Async)"
                    // where @Async is on method level
                    + " || @annotation(org.springframework.scheduling.annotation.Async)")
    public void asyncMethods() {
    }

    @Around("asyncMethods()")
    public Object runWithErrorHandling(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        Async annotation = method.getAnnotation(Async.class);
        if (annotation == null) {
            annotation = method.getDeclaringClass().getAnnotation(Async.class);
        }
        if (annotation == null) {
            // shouldn't happen because of pointcut configuration, just for safety
            return joinPoint.proceed();
        }

        String asyncExecutorName = annotation.value();
        if (StringUtils.isEmpty(asyncExecutorName)) {
            asyncExecutorName = DEFAULT_EXECUTOR_BEAN_NAME;
        }

        ErrorHandlingThreadPoolExecutor asyncExecutor = errorHandlingExecutors.get(asyncExecutorName);
        if (asyncExecutor == null) {
            // can happen if the declared executor isn't extending ErrorHandlingThreadPoolExecutor
            // or if @Async uses the default executor which is either not registered as a bean at all
            // or not named DEFAULT_EXECUTOR_BEAN_NAME
            return joinPoint.proceed();
        }

        try {
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            asyncExecutor.handleError(throwable);
            return null;
        }
    }
}

Pros:

  1. Allows to handle errors in asynchronously executed code without dealing with threads.
  2. Can have different error handling based on the executor.
  3. Can wrap methods returning both void and Future<>.

Cons:

  1. Cannot handle errors in the calling thread (only in the called one).
  2. Requires to register the default executor as a bean and give it a specific name.
  3. Works only with @Async annotations and not with async code passed directly to the executor with submit().

Solution

  • If you use an aspect like this, you can get rid of your error handling block in the executor or just use a normal executor and delete the whole (not really functioning) error handling executor completely. I did and it works:

    package com.example.demo;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.stereotype.Component;
    
    @Component
    @Aspect
    public class ErrorHandlingAspect {
      // If necessary, narrow down the pointcut more here
      @Around("@within(org.springframework.scheduling.annotation.Async)")
      public Object advice(ProceedingJoinPoint joinPoint) {
        try {
          return joinPoint.proceed();
        }
        catch (Throwable throwable) {
          handleError(throwable);
          // Can also return empty future here for non-void methods
          return null;
        }
      }
    
      private void handleError(Throwable ex) {
        System.out.println("[ERROR] " + ex);
      }
    }
    

    When I delete ErrorHandlingThreadPoolExecutor, change the annotation on Writer to just @Async and Config.writeBySchedule like this:

    @Scheduled(fixedRate = 1000)
    public void writeBySchedule() {
      writer.write();
      writer.writeFuture();
    }
    

    the console log looks like this:

      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v2.1.8.RELEASE)
    
    2020-07-15 07:41:02.314  INFO 18672 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication on Xander-Ultrabook with PID 18672 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
    (...)
    2020-07-15 07:41:06.839  INFO 18672 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
    Writing something
    Writing something with future
    [ERROR] java.lang.RuntimeException: Writer exception
    [ERROR] java.lang.RuntimeException: Writer exception with future
    Writing something
    [ERROR] java.lang.RuntimeException: Writer exception
    Writing something with future
    [ERROR] java.lang.RuntimeException: Writer exception with future
    Writing something
    Writing something with future
    [ERROR] java.lang.RuntimeException: Writer exception
    [ERROR] java.lang.RuntimeException: Writer exception with future
    (...)