javaspring-bootspring-batch

Spring Batch job runs fine only the first time. Doesn't work after that


I am triggering a batch job using an API. The first time I trigger using the API after each server restart works fine. Goes through all the break points and saves the data in the DB as intended.However, after that, the log says the job has been started and the step is being executed. But it doesn't hit any break points and the data is also not saved as expected.My code and config is as follows -

// Controller
@RestController
@Component
public class MyController {
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job insertJob;
    
    @PostMapping("/start")
    public String startBatchJob(@RequestParam String validation) {
        try {
            String jobId = String.valueOf(System.currentTimeMillis());
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("jobId", jobId)
                    .addString("validation", validation)
                    .toJobParameters();

            JobExecution jobExecution = jobLauncher.run(insertJob, jobParameters);

            if (jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.FAILED)
                return "Job FAILED with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();

            // Get job status or other details
            return "Job finished successfully with Job ID: " + jobId + ". Status: " + jobExecution.getStatus();
        } catch (JobExecutionException e) {
            e.printStackTrace();
            return "Error starting job: " + e.getMessage();
        }
    }
}

// Batch Config
@Configuration
public class SpringBatchConfig {

    @Autowired
    ScheduledTasks scheduledTasks;

    @Autowired
    ValidationRepository validationRepository;

    @Autowired
    @Lazy
    PlatformTransactionManager transactionManager;

    @Autowired
    @Lazy
    JobRepository jobRepository;


    public List<Personality> getData() {
        return scheduledTasks.callRestApiForData();
    }

    @Bean(name = "insertJob")
    public Job insertJob(BaseWriter<Personality> writer,
                             BaseProcessor<Personality, Personality> processor,
                             BaseReader<Personality> reader) {
        return new JobBuilder("insertJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .listener(insertJobListener()).start(step_1(writer, processor, reader)).build();
    }

    @Bean
    public Step step_1(BaseWriter<Personality> writer,
                       BaseProcessor<Personality, Personality> processor,
                       BaseReader<Personality> itemReader) {
        return new StepBuilder("step_1", jobRepository)
                .<Personality, Personality> chunk(200, transactionManager)
                .reader(itemReader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public JobExecutionListener insertJobListener() {
        return new InsertJobCompletionListener();
    }

}

// listener
@Slf4j
public class InsertJobCompletionListener extends JobExecutionListenerSupport {


    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("UPDATE BATCH COMPLETED");
        }
        else if(jobExecution.getStatus() == BatchStatus.FAILED){
            log.info("UPDATE BATCH JOB FAILED TO COMPLETE");
        }


    }
}


// reader

public interface BaseReader<T> extends ItemReader<T> {
}


@Configuration
public class ExchangeRateBaseReader<T> implements BaseReader<Personality> {

    @Autowired
    ScheduledTasks scheduledTasks;

    private boolean read = false;
    private List<Personality> data;
    private int index = 0;

    @Override
    public Personality read() throws Exception {
        if (data == null) {
            data = getData();
        }
        Personality item = null;
        if (index < data.size()) {
            item = data.get(index);
            index++;
        }
        return item;
    }

    public List<Personality> getData() {
        return scheduledTasks.callRestApiForData();
    }
}



// processor

public interface BaseProcessor<I,O> extends ItemProcessor<I, O> {
}

@Configuration
@Slf4j
public class CurrencyExchangeProcessor<I,O> implements BaseProcessor<Personality, Personality> {
    @Override
    public Personality process(Personality Personality) throws Exception {
        return Personality;
    }
}


// writer
public interface BaseWriter<T> extends ItemWriter<T> {
}


@Configuration
@Slf4j
public class CurrencyExchangeWriter<T> implements BaseWriter<Personality> {

    @Autowired
    private ValidationRepository validationRepository;

    @Override
    public void write(Chunk<? extends Personality> chunk) throws Exception {
        log.info("Total number of items to save - {}",chunk.getItems().size());
        validationRepository.saveAll(chunk.getItems());
    }
}


Thanks for the help in advance.


Solution

  • Ok I fixed it. The problem was with the reader. Once the reader's job is done for the first time, the data is no longer null. So, it will not call for the api. Also, the index is not zero.

    So, making data null and index 0 in @AfterStep fixed it for me.