I am triggering a batch job using an API. The first time I trigger using the API after each server restart works fine. Goes through all the break points and saves the data in the DB as intended.However, after that, the log says the job has been started and the step is being executed. But it doesn't hit any break points and the data is also not saved as expected.My code and config is as follows -
// Controller
@RestController
@Component
public class MyController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job insertJob;
@PostMapping("/start")
public String startBatchJob(@RequestParam String validation) {
try {
String jobId = String.valueOf(System.currentTimeMillis());
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", jobId)
.addString("validation", validation)
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(insertJob, jobParameters);
if (jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.FAILED)
return "Job FAILED with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();
// Get job status or other details
return "Job finished successfully with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();
} catch (JobExecutionException e) {
e.printStackTrace();
return "Error starting job: " + e.getMessage();
}
}
}
// Batch Config
@Configuration
public class SpringBatchConfig {
@Autowired
ScheduledTasks scheduledTasks;
@Autowired
ValidationRepository validationRepository;
@Autowired
@Lazy
PlatformTransactionManager transactionManager;
@Autowired
@Lazy
JobRepository jobRepository;
public List<Personality> getData() {
return scheduledTasks.callRestApiForData();
}
@Bean(name = "insertJob")
public Job insertJob(BaseWriter<Personality> writer,
BaseProcessor<Personality, Personality> processor,
BaseReader<Personality> reader) {
return new JobBuilder("insertJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(insertJobListener()).start(step_1(writer, processor, reader)).build();
}
@Bean
public Step step_1(BaseWriter<Personality> writer,
BaseProcessor<Personality, Personality> processor,
BaseReader<Personality> itemReader) {
return new StepBuilder("step_1", jobRepository)
.<Personality, Personality> chunk(200, transactionManager)
.reader(itemReader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public JobExecutionListener insertJobListener() {
return new InsertJobCompletionListener();
}
}
// listener
@Slf4j
public class InsertJobCompletionListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("UPDATE BATCH COMPLETED");
}
else if(jobExecution.getStatus() == BatchStatus.FAILED){
log.info("UPDATE BATCH JOB FAILED TO COMPLETE");
}
}
}
// reader
public interface BaseReader<T> extends ItemReader<T> {
}
@Configuration
public class ExchangeRateBaseReader<T> implements BaseReader<Personality> {
@Autowired
ScheduledTasks scheduledTasks;
private boolean read = false;
private List<Personality> data;
private int index = 0;
@Override
public Personality read() throws Exception {
if (data == null) {
data = getData();
}
Personality item = null;
if (index < data.size()) {
item = data.get(index);
index++;
}
return item;
}
public List<Personality> getData() {
return scheduledTasks.callRestApiForData();
}
}
// processor
public interface BaseProcessor<I,O> extends ItemProcessor<I, O> {
}
@Configuration
@Slf4j
public class CurrencyExchangeProcessor<I,O> implements BaseProcessor<Personality, Personality> {
@Override
public Personality process(Personality Personality) throws Exception {
return Personality;
}
}
// writer
public interface BaseWriter<T> extends ItemWriter<T> {
}
@Configuration
@Slf4j
public class CurrencyExchangeWriter<T> implements BaseWriter<Personality> {
@Autowired
private ValidationRepository validationRepository;
@Override
public void write(Chunk<? extends Personality> chunk) throws Exception {
log.info("Total number of items to save - {}",chunk.getItems().size());
validationRepository.saveAll(chunk.getItems());
}
}
Thanks for the help in advance.
Ok I fixed it. The problem was with the reader. Once the reader's job is done for the first time, the data is no longer null. So, it will not call for the api. Also, the index is not zero.
So, making data null and index 0 in @AfterStep fixed it for me.