springspring-batchspring-batch-integration

spring-batch: AsyncItemProcessor: how can one have JobListener methods called in delegates?


In the spring-batch-integration documentation https://docs.spring.io/spring-batch/reference/spring-batch-integration/sub-elements.html#asynchronous-processors for AsyncItemProcessor, the example shows creating the AsyncItemProcessor and setting the usual ItemProcessor as a delegate, i.e. :

@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

However, if the delegate is also a StepExecutionListener, the delegate's methods beforeStep() and afterStep() methods will not be called.

How can one have the AsyncItemProcessor's delegate's StepExecutionListener methods called?

thanks for any pointers / help


Solution

  • When I ran into the same problem, I took the approach of composing a StepExecutionListeningItemProcessor as the base interface of my ItemProcessor.

    public interface StepExecutionListeningItemProcessor<I, O> extends ItemProcessor<I, O>,
                                                                       StepExecutionListener {
    }
    

    I then composed a StepExecutionListeningAsyncItemProcessor:

    public class StepExecutionListeningAsyncItemProcessor<I, O> extends AsyncItemProcessor<I, O>,
                                                                implements StepExecutionListener {
    
        private final StepExecutionListeningItemProcessor<I, O> ourDelegate;
    
        public StepExecutionListeningAsyncItemProcessor(StepExecutionListeningItemProcessor<I, O> delegate,
                                                        TaskExecutor taskExecutor) {
            ourDelegate = delegate;
            super.setDelegate(delegate);
            super.setTaskExecutor(taskExecutor);
        }
    
        @Override
        public void beforeStep(StepExecution stepExecution) {
            ourDelegate.beforeStep(stepExecution);
        }
    
        @Override
        public void afterStep(StepExecution stepExecution) {
            ourDelegate.afterStep(stepExecution);
        }
    }
    

    I had to make my base ItemProcessor so that it extends StepExecutionListeningItemProcessor.

    Since the signature for StepBuilder.processor() doesn't accept an AsyncItemProcessor, need to cast using 'raw' types. (N.B. That's not because of this approach but instead due to the type incompatibilities of AsyncItemProcessor and ItemProcessor.)

    I used a simple method for this:

    private static ItemProcessor<XXX, YYY> asyncXxxToYyyProcessor(StepExecutionListeningItemProcessor<XXX, YYY> itemProcessor, 
                           TaskExecutor taskExecutor) {
        // N.B. Cast as raw type because they are not generics compatible
        return (ItemProcessor) new StepExecutionListeningAsyncItemProcessor<>(itemProcessor, taskExecutor);
    }
    

    This method can be called as the argument to the StepBuilder.processor() method, passing as the arguments to this method the delegate itemProcessor.