spring-bootspring-batchbatch-processingspring-batch-tasklet

How to add tasklet to run after each partition step completion in Spring Batch


I am new to Spring batch and implementing a spring batch job where it has to pull huge data set from DB and write to file. Below is the sample job config which is working as expected for me.

@Bean
public Job customDBReaderFileWriterJob() throws Exception {
    return jobBuilderFactory.get(MY_JOB)
            .incrementer(new RunIdIncrementer())
            .flow(partitionGenerationStep())
            .next(cleanupStep())
            .end()
            .build();
}

@Bean
public Step partitionGenerationStep() throws Exception {
    return stepBuilderFactory
            .get("partitionGenerationStep")
            .partitioner("Partitioner", partitioner())
            .step(multiOperationStep())
            .gridSize(50)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step multiOperationStep() throws Exception {
    return stepBuilderFactory
            .get("MultiOperationStep")
            .<Input, Output>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public DBPartitioner partitioner() {
    DBPartitioner dbPartitioner = new DBPartitioner();
    dbPartitioner.setColumn(ID);
    dbPartitioner.setDataSource(dataSource);
    dbPartitioner.setTable(TABLE);
    return dbPartitioner;
}

@Bean
@StepScope
public Reader reader() {
    return new Reader();
}

@Bean
@StepScope
public Processor processor() {
    return new Processor();
}

@Bean
@StepScope
public Writer writer() {
    return new Writer();
}    

@Bean
public Step cleanupStep() {
    return stepBuilderFactory.get("cleanupStep")
            .tasklet(cleanupTasklet())
            .build();
}

@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
    return new CleanupTasklet();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-");
    return executor;
}    

As the data set is huge, i have configured thread pool value for task-executor as 10 and grid size 50. With this setup 10 threads are writing to 10 files at a time, and reader is reading file in chunks so reader processor and writer flow is iterating multiple times (for a group of 10, before moving to next partition).

Now, I would like to add a tasklet where i can compress files once all iteration (read, process,write) for one thread is completed i.e. after completion of each partition.

I do have a cleanup tasklet to run at last, but having compression logic there means to get all files generated from each partition first and then perform compression. Please suggest.


Solution

  • You can change your worker step multiOperationStep to be a FlowStep of a chunk-oriented step followed by a simple tasklet step where you do the compression. In other words, the worker step is actually two steps combined in one FlowStep.