springspring-bootspring-batchbatch-processingspring-framework-beans

Null Pointer Exception on passing the value between steps in Spring Batch


I am learning Spring Batch and I am doing a sample program in which I need to pass values from one step to another.

the scenario: I have a person table in which I am pulling the person details from it, save a couple of columns to a DTO (in the ItemWriter for Step 1) and pass the values from the DTO to a different table on the where clause to pull the related values from it (in the ItemReader for Step 2). In the end I will be generating a CSV with all these values.

Here is my code:

@Bean
    public Job job() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        return jobBuilderFactory.get("readDBJob").incrementer(new RunIdIncrementer()).start(step1()).next(step2())
                .build();
    }

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1").<Person, Person>chunk(500000).reader(itemReader())
            .writer(itemWriter()).listener(promotionListener()).build();
}

@Bean
    public Step step2() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        return stepBuilderFactory.get("step2").<Person, Result>chunk(100)
                .reader(readingObjectItemReader.cursorReader()).writer(itemWriterForStep2()).build();
    }

ItemWriter for Step 1:

        @Bean
public ItemWriter<Person> itemWriter() {
    return new ItemWriter<Person>() {

        private StepExecution stepExecution;
        List<personDTO> responseList = null;

        @Override
        public void write(List<? extends Person> items) throws Exception {
            for (Person item : items) {
                personDTO responseObject = new personDTO();
                BeanUtils.copyProperties(item, responseObject);
                if(responseObject != null && responseObject.getPersonId() != null) {
                    if(stepExecution.getExecutionContext().containsKey("personDtoObject")) {
                        responseList = (List<personDTO>) this.stepExecution.getExecutionContext().get("personDtoObject");
                    }
                    responseList.add(responseObject);
                    this.stepExecution.getExecutionContext().put("personDtoObject", responseList);
                }
            }
        }

        @BeforeStep
        public void saveStepExecution(StepExecution stepExecution) { 
            this.stepExecution = stepExecution;
            this.stepExecution.getExecutionContext().put("personDtoObject", new ArrayList<>());
        }
}

Job execution context:

    @Bean
    public Object promotionListener() { 
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"personDtoObject"});
        listener.setStrict(true);
        return listener;
    }

Here is how I am trying to access the value in my step 2 ItemReader

public class ReadingObjectItemReader implements ItemReader {

@Autowired
DataSource dataSource;

private List<personDTO> personDtoList;

String value;


@Override
public personDetails read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    return null;
}

@Bean
public JdbcCursorItemReader<personDetails> cursorReader() {

    System.out.println("Values from the step 1 " + personDtoList);
    ....
}

@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    ExecutionContext jobContext = jobExecution.getExecutionContext();
    personDtoList=  (List<personDTO>) jobContext.get("personDtoObject");
}

}

When I am trying to access the value of personDtoList in step 2 I am getting null. I validated the values in the StepContext before my step 1 completes, Everything looks good till there but when tried to access them on Step 2 I am getting null.

I looked at most of the resources available online but I couldn't figure it out on where I am going wrong. Any help is appreciated.

Thanks for the help in advance.


Solution

  • In your item writer of step 1 you are doing:

    ExecutionContext stepContext = this.stepExecution.getExecutionContext();
    stepContext.put("personDtoObject", responseList);
    

    which means you are overriding the previous list for each chunk. What you need to do is get the list from the execution context and add items in it before overriding the key. You also need to add some sanity checks at step boundaries (aka the first chunk and the last chunk) to make sure the list is initialized and that it is not null before putting it in the execution context (In particular with the last chunk).

    Edit: Add code change required for the promotion listener to work

    You also need to change the return type of the promotionListener() method from Object to ExecutionContextPromotionListener:

    @Bean
    public ExecutionContextPromotionListener promotionListener() { 
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
        listener.setKeys(new String[] {"personDtoObject"});
        listener.setStrict(true);
        return listener;
    }
    

    Otherwise this bean is not correctly registered as a listener. Here is a complete example:

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        private JobBuilderFactory jobBuilderFactory;
        private StepBuilderFactory stepBuilderFactory;
    
        public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            this.jobBuilderFactory = jobBuilderFactory;
            this.stepBuilderFactory = stepBuilderFactory;
        }
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return new ItemWriter<Integer>() {
    
                private StepExecution stepExecution;
    
                @Override
                public void write(List<? extends Integer> items) {
                    List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
                    for (Integer item : items) {
                        System.out.println("item = " + item);
                        itemsList.add(item);
                    }
                }
    
                @BeforeStep
                public void saveStepExecution(StepExecution stepExecution) {
                    this.stepExecution = stepExecution;
                    this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
                }
            };
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Integer, Integer>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .listener(promotionListener())
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet((contribution, chunkContext) -> {
                        ExecutionContext executionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
                        List<Integer> items = (List<Integer>) executionContext.get("items");
                        System.out.println("Items read in step1:");
                        for (Integer item : items) {
                            System.out.println("item = " + item);
                        }
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
            listener.setKeys(new String[]{"items"});
            listener.setStrict(true);
            return listener;
        }
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("job")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
    }
    

    which prints:

    item = 1
    item = 2
    item = 3
    item = 4
    Items read in step1:
    item = 1
    item = 2
    item = 3
    item = 4
    

    Edit 2: Add example with chunk oriented step

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.function.Consumer;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.annotation.BeforeStep;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        private JobBuilderFactory jobBuilderFactory;
        private StepBuilderFactory stepBuilderFactory;
    
        public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            this.jobBuilderFactory = jobBuilderFactory;
            this.stepBuilderFactory = stepBuilderFactory;
        }
    
        @Bean
        public ItemReader<Integer> itemReader() {
            return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
        }
    
        @Bean
        public ItemWriter<Integer> itemWriter() {
            return new ItemWriter<Integer>() {
    
                private StepExecution stepExecution;
    
                @Override
                public void write(List<? extends Integer> items) {
                    List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
                    for (Integer item : items) {
                        System.out.println("item = " + item);
                        itemsList.add(item);
                    }
                }
    
                @BeforeStep
                public void saveStepExecution(StepExecution stepExecution) {
                    this.stepExecution = stepExecution;
                    this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
                }
            };
        }
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .<Integer, Integer>chunk(2)
                    .reader(itemReader())
                    .writer(itemWriter())
                    .listener(promotionListener())
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .<Integer, Integer>chunk(2)
                    .reader(new ReadingObjectItemReader())
                    .writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer = " + integer)))
                    .build();
        }
    
        @Bean
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
            listener.setKeys(new String[]{"items"});
            listener.setStrict(true);
            return listener;
        }
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("job")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
        public static class ReadingObjectItemReader implements ItemReader<Integer> {
    
            int i = 0;
            private List<Integer> items;
    
            @Override
            public Integer read() {
                if (i >= items.size()) {
                    return null;
                } else {
                    return items.get(i++);
                }
            }
    
            @BeforeStep
            public void retrieveSharedData(StepExecution stepExecution) {
                JobExecution jobExecution = stepExecution.getJobExecution();
                ExecutionContext jobContext = jobExecution.getExecutionContext();
                items =  (List<Integer>) jobContext.get("items");
            }
        }
    
    }
    

    prints:

    item = 1
    item = 2
    item = 3
    item = 4
    integer = 1
    integer = 2
    integer = 3
    integer = 4
    

    which means the list has been correctly retrieved from the job execution context in step 2 which is a chunk-oriented step.