I am learning Spring Batch and I am doing a sample program in which I need to pass values from one step to another.
the scenario: I have a person table in which I am pulling the person details from it, save a couple of columns to a DTO (in the ItemWriter for Step 1) and pass the values from the DTO to a different table on the where clause to pull the related values from it (in the ItemReader for Step 2). In the end I will be generating a CSV with all these values.
Here is my code:
@Bean
public Job job() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return jobBuilderFactory.get("readDBJob").incrementer(new RunIdIncrementer()).start(step1()).next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1").<Person, Person>chunk(500000).reader(itemReader())
.writer(itemWriter()).listener(promotionListener()).build();
}
@Bean
public Step step2() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
return stepBuilderFactory.get("step2").<Person, Result>chunk(100)
.reader(readingObjectItemReader.cursorReader()).writer(itemWriterForStep2()).build();
}
ItemWriter for Step 1:
@Bean
public ItemWriter<Person> itemWriter() {
return new ItemWriter<Person>() {
private StepExecution stepExecution;
List<personDTO> responseList = null;
@Override
public void write(List<? extends Person> items) throws Exception {
for (Person item : items) {
personDTO responseObject = new personDTO();
BeanUtils.copyProperties(item, responseObject);
if(responseObject != null && responseObject.getPersonId() != null) {
if(stepExecution.getExecutionContext().containsKey("personDtoObject")) {
responseList = (List<personDTO>) this.stepExecution.getExecutionContext().get("personDtoObject");
}
responseList.add(responseObject);
this.stepExecution.getExecutionContext().put("personDtoObject", responseList);
}
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("personDtoObject", new ArrayList<>());
}
}
Job execution context:
@Bean
public Object promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
Here is how I am trying to access the value in my step 2 ItemReader
public class ReadingObjectItemReader implements ItemReader {
@Autowired
DataSource dataSource;
private List<personDTO> personDtoList;
String value;
@Override
public personDetails read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return null;
}
@Bean
public JdbcCursorItemReader<personDetails> cursorReader() {
System.out.println("Values from the step 1 " + personDtoList);
....
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
personDtoList= (List<personDTO>) jobContext.get("personDtoObject");
}
}
When I am trying to access the value of personDtoList in step 2 I am getting null. I validated the values in the StepContext before my step 1 completes, Everything looks good till there but when tried to access them on Step 2 I am getting null.
I looked at most of the resources available online but I couldn't figure it out on where I am going wrong. Any help is appreciated.
Thanks for the help in advance.
In your item writer of step 1 you are doing:
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("personDtoObject", responseList);
which means you are overriding the previous list for each chunk. What you need to do is get the list from the execution context and add items in it before overriding the key. You also need to add some sanity checks at step boundaries (aka the first chunk and the last chunk) to make sure the list is initialized and that it is not null
before putting it in the execution context (In particular with the last chunk).
Edit: Add code change required for the promotion listener to work
You also need to change the return type of the promotionListener()
method from Object
to ExecutionContextPromotionListener
:
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"personDtoObject"});
listener.setStrict(true);
return listener;
}
Otherwise this bean is not correctly registered as a listener. Here is a complete example:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
ExecutionContext executionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
List<Integer> items = (List<Integer>) executionContext.get("items");
System.out.println("Items read in step1:");
for (Integer item : items) {
System.out.println("item = " + item);
}
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
which prints:
item = 1
item = 2
item = 3
item = 4
Items read in step1:
item = 1
item = 2
item = 3
item = 4
Edit 2: Add example with chunk oriented step
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2)
.reader(new ReadingObjectItemReader())
.writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer = " + integer)))
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class ReadingObjectItemReader implements ItemReader<Integer> {
int i = 0;
private List<Integer> items;
@Override
public Integer read() {
if (i >= items.size()) {
return null;
} else {
return items.get(i++);
}
}
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
items = (List<Integer>) jobContext.get("items");
}
}
}
prints:
item = 1
item = 2
item = 3
item = 4
integer = 1
integer = 2
integer = 3
integer = 4
which means the list has been correctly retrieved from the job execution context in step 2 which is a chunk-oriented step.