springasynchronousspring-batchbatch-processingspring-batch-tasklet

@BeforeStep not being called in AsyncProcessor


I have been using the synchronous ItemProcessor and Writer but now I moved it to Asynchronous as the code below:

@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
    return jobBuilderFactory.get("Import-Entities-Risk-Codes")
            .incrementer(new RunIdIncrementer())
            .listener(notificationExecutionListener)
            .start(computeFormFileToDB)
            .build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
    return stepBuilderFactory.get("ImportFraudCodesStep")
            .<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
            .reader(entityRiskCodeFileReader)
            .processor(asyncProcessor())
            .writer(asyncWriter())
            .faultTolerant()
            .skipPolicy(customSkipPolicy)
            .listener(customStepListener)
            .listener(chunkCounterListener())
            .taskExecutor(taskExecutor())
            .throttleLimit(6)
            .build();
}

In my ItemPocessor<I,O> i use the @BeforeStep to get the value I've stored in a StepExecutionContext:

@BeforeStep
public  void getKey(StepExecution stepExecution) {
    log.info("Fetching batchNumber");
    ExecutionContext context = stepExecution.getExecutionContext();
    this.sequenceNumber = (Integer) context.get("sequenceNumber");
}

And here the declaration of my AsyncProcessor:

  @Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
    var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
    asyncItemProcessor.setDelegate(riskCodeItemProcessor());
    asyncItemProcessor.setTaskExecutor(taskExecutor());
    return asyncItemProcessor;
}

The problem is the fact that the method above is not being called. How can I get values from StepExecution and pass them into an Asynchronous ItemProcessor or AsyncItemWiter?


Solution

  • The reason is that since your item processor is a delegate of an async item processor, it is not automatically registered as a listener and this should be done manually. Here is an excerpt from the Intercepting Step Execution section of the docs:

    If the listener is nested inside another component, it needs to be explicitly
    registered (as described previously under "Registering ItemStream with a Step").
    

    So in your use case, you need to register the delegate riskCodeItemProcessor() as a listener in your step and the method annotated with @BeforeStep should be called. Here is a quick example:

    import java.util.Arrays;
    import java.util.concurrent.Future;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.integration.async.AsyncItemProcessor;
    import org.springframework.batch.integration.async.AsyncItemWriter;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJobConfig {
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
        }
    
        @Bean
        public ItemProcessor<Integer, Integer> itemProcessor() {
            return new MyItemProcessor();
        }
    
        @Bean
        public AsyncItemProcessor<Integer, Integer> asyncItemProcessor() {
            AsyncItemProcessor<Integer, Integer> asyncItemProcessor = new AsyncItemProcessor<>();
            asyncItemProcessor.setDelegate(itemProcessor());
            asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
            return asyncItemProcessor;
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return items -> {
                for (Integer item : items) {
                    System.out.println(Thread.currentThread().getName() + ": item = " + item);
                }
            };
        }
        
        @Bean
        public AsyncItemWriter<Integer> asyncItemWriter() {
            AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
            asyncItemWriter.setDelegate(itemWriter());
            return asyncItemWriter;
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("myJob")
                    .start(steps.get("myStep")
                            .<Integer, Future<Integer>>chunk(5)
                            .reader(itemReader())
                            .processor(asyncItemProcessor())
                            .writer(asyncItemWriter())
                            .listener(itemProcessor())
                            .build())
                    .build();
        }
    
        static class MyItemProcessor implements ItemProcessor<Integer, Integer> {
    
            private StepExecution stepExecution;
            
            @Override
            public Integer process(Integer item) throws Exception {
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + ": processing item " + item 
                        + " as part of step " + stepExecution.getStepName());
                return item + 1;
            }
    
            @BeforeStep
            public void saveStepExecution(StepExecution stepExecution) {
                this.stepExecution = stepExecution;
            }
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfig.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
        
    
    }
    

    This prints:

    SimpleAsyncTaskExecutor-1: processing item 0 as part of step myStep
    SimpleAsyncTaskExecutor-2: processing item 1 as part of step myStep
    SimpleAsyncTaskExecutor-3: processing item 2 as part of step myStep
    SimpleAsyncTaskExecutor-4: processing item 3 as part of step myStep
    SimpleAsyncTaskExecutor-5: processing item 4 as part of step myStep
    main: item = 1
    main: item = 2
    main: item = 3
    main: item = 4
    main: item = 5
    SimpleAsyncTaskExecutor-6: processing item 5 as part of step myStep
    SimpleAsyncTaskExecutor-7: processing item 6 as part of step myStep
    SimpleAsyncTaskExecutor-8: processing item 7 as part of step myStep
    SimpleAsyncTaskExecutor-9: processing item 8 as part of step myStep
    SimpleAsyncTaskExecutor-10: processing item 9 as part of step myStep
    main: item = 6
    main: item = 7
    main: item = 8
    main: item = 9
    main: item = 10
    

    That said, it is not recommended to rely on the execution context in a multi-threaded setup as this context is shared between threads.