In the spring-batch-integration
documentation https://docs.spring.io/spring-batch/reference/spring-batch-integration/sub-elements.html#asynchronous-processors for AsyncItemProcessor
, the example shows creating the AsyncItemProcessor
and setting the usual ItemProcessor
as a delegate, i.e. :
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
However, if the delegate is also a StepExecutionListener
, the delegate's methods beforeStep()
and afterStep()
methods will not be called.
How can one have the AsyncItemProcessor
's delegate's StepExecutionListener
methods called?
thanks for any pointers / help
When I ran into the same problem, I took the approach of composing a StepExecutionListeningItemProcessor
as the base interface
of my ItemProcessor
.
public interface StepExecutionListeningItemProcessor<I, O> extends ItemProcessor<I, O>,
StepExecutionListener {
}
I then composed a StepExecutionListeningAsyncItemProcessor
:
public class StepExecutionListeningAsyncItemProcessor<I, O> extends AsyncItemProcessor<I, O>,
implements StepExecutionListener {
private final StepExecutionListeningItemProcessor<I, O> ourDelegate;
public StepExecutionListeningAsyncItemProcessor(StepExecutionListeningItemProcessor<I, O> delegate,
TaskExecutor taskExecutor) {
ourDelegate = delegate;
super.setDelegate(delegate);
super.setTaskExecutor(taskExecutor);
}
@Override
public void beforeStep(StepExecution stepExecution) {
ourDelegate.beforeStep(stepExecution);
}
@Override
public void afterStep(StepExecution stepExecution) {
ourDelegate.afterStep(stepExecution);
}
}
I had to make my base ItemProcessor
so that it extends StepExecutionListeningItemProcessor
.
Since the signature for StepBuilder.processor()
doesn't accept an AsyncItemProcessor
, need to cast using 'raw' types. (N.B. That's not because of this approach but instead due to the type incompatibilities of AsyncItemProcessor
and ItemProcessor
.)
I used a simple method for this:
private static ItemProcessor<XXX, YYY> asyncXxxToYyyProcessor(StepExecutionListeningItemProcessor<XXX, YYY> itemProcessor,
TaskExecutor taskExecutor) {
// N.B. Cast as raw type because they are not generics compatible
return (ItemProcessor) new StepExecutionListeningAsyncItemProcessor<>(itemProcessor, taskExecutor);
}
This method can be called as the argument to the StepBuilder.processor()
method, passing as the arguments to this method the delegate itemProcessor
.