javaspringspring-bootspring-batch

Set chunksize dynamically after fetching from db


I need to set the chunk-size dynamically in a spring batch job's step which is stored in the database i.e the chunksize needs to be fetched from the database and set into the bean.

My Query is something like:

select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'

Here the value for ID would come from the job parameters which is set via a request param passed with the request into the Rest Controller(while triggering the batch job)

I want to fetch this CHUNK_SIZE from the database and set it dynamically into the job's step. Our requirement is that the chunksize varies for the step based on the ID value, the details of which are stored in a db table. For example:

ID CHUNK_SIZE
01 1000
02 2500

I know that the beans in a job are set at the configuration time, and the job parameters are passed at the runtime while triggering the job.

EDIT:

The example provided by MahmoudBenHassine uses @JobScope and accesses the jobParameters in the step bean using @Value("#{jobParameters['id']}"). I tried implementing a similar approach using the jobExecutionContext as follows:

  1. Fetched the chunkSize from the db table in the StepExecutionListener's beforeStep method and set it in the ExecutionContext.

  2. Annotated the step bean with @JobScope and used @Value("#{jobExecutionContext['chunk']}") to access it in the step bean.

  3. But I face the following error:

    Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException

It is not able to access the 'chunk' key-value from the jobExecutionContext, thus throwing the NullPointerException. Does it need to be promoted somehow so that it can be accessed in the step bean? If yes, a quick sample or a direction would be really appreciated.

My Controller class:

@RestController
public class SampleController {

    @Autowired
    JobLauncher sampleJobLauncher;

    @Autowired
    Job sampleJob;
    
    @GetMapping("/launch")
    public BatchStatus launch(@RequestParam(name = "id", required = true) String id){

        Map<String, JobParameter> map = new HashMap<>();
        map.put("id",  new JobParameter(id));
        map.put("timestamp",  new JobParameter(System.currentTimeMillis));

        JobParameters params = new JobParameters(map);
        JobExecution j = sampleJobLauncher.run(sampleJob, params);

        return j.getStatus();
    }
}   

My batch config class(containing job and step bean):

@Configuration
public class SampleBatchConfig{

    @Autowired
    private JobBuilderFactory myJobBuilderFactory;

    @Autowired
    private StepBuilderFactory myStepBuilderFactory;

    @Autowired
    private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
    
    @Autowired
    MyReader myReader;
    
    @Autowired
    MyWriter myWriter;
    
    @Bean
    @JobScope
    public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
        return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())
                .listener(new StepExecutionListener() {
                    @Override
                    public void beforeStep(StepExecution stepExecution) {
                        int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
                        stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
                    }

                    @Override
                    public ExitStatus afterStep(StepExecution stepExecution) {
                        return null;
                    }
                })
                .build();
    }

    @Bean
    public Job job(){
        return myJobBuilderFactory.get("sampleJob")
                .incrementer(new RunIdIncrementer())
                .start(sampleStep(null))
                .build();
    }

}

NOTE: The job may have multiple steps with different chunkSizes, and in that case chunkSize is to be fetched separately for each step.

EDIT 2: Changing my step definition as follows works, but there is a problem. Here the reader reads a list having 17 items, in a chunk of size 4.

@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
   int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
   return myStepBuilderFactory.get("sampleStep")
                .<MyClass, MyClass>chunk(chunkSize)
                .reader(myReader.sampleReader())
                .writer(myWriter.sampleWriter())  
                .listener(new ChunkListenerSupport() {
                    @Override
                    public void afterChunk(ChunkContext context) {
                        System.out.println("MyJob.afterChunk");
                    }

                    @Override
                    public void beforeChunk(ChunkContext context) {
                        System.out.println("MyJob.beforeChunk");
                    }
                })                      
                .build();
}

The first time I trigger the job from the url, it works fine and prints the following: (The chunk Size is set to 4 in the db table)

2021-05-03 15:06:44.859  INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]
MyJob.beforeChunk

item = 1

item = 2

item = 3

item = 4

MyJob.afterChunk

MyJob.beforeChunk

item = 5

item = 6

item = 7

item = 8

MyJob.afterChunk

MyJob.beforeChunk

item = 9

item = 10

item = 11

item = 12

MyJob.afterChunk

MyJob.beforeChunk

item = 13

item = 14

item = 15

item = 16

MyJob.afterChunk

MyJob.beforeChunk

item = 17

MyJob.afterChunk

But if I trigger the job again, without restarting the server/spring container, the following is printed:

2021-05-03 15:11:02.427  INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]

MyJob.beforeChunk

MyJob.afterChunk

In Short, it works fine for exactly once, when the server is restarted. But it doesn't work for the subsequent job executions without restarting the server.


Solution

  • Since you pass the ID as a job parameter and you want to get the chunk size dynamically from the database based on that ID while configuring the step, you have two options:

    Use a job-scoped step:

    @Bean
    @JobScope
    public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
       int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
       return myStepBuilderFactory.get("sampleStep")
                    .<MyClass, MyClass>chunk(chunkSize)
                    .reader(myReader.sampleReader())
                    .writer(myWriter.sampleWriter())                        
                    .build();
    }
    

    Use a step-scoped completion policy

    
    @Bean
    @StepScope
    public SimpleCompletionPolicy chunkCompletionPolicy(@Value("#{jobParameters['id']}") Integer id) {
       int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunkSize from the db table using the id job parameter
       return new SimpleCompletionPolicy(chunkSize);
    }
    
    
    @Bean
    public Step sampleStep(PlatformTransactionManager transactionManager, SimpleCompletionPolicy chunkCompletionPolicy){
       return myStepBuilderFactory.get("sampleStep")
                    .chunk(chunkCompletionPolicy, transactionManager)
                    .reader(myReader.sampleReader())
                    .writer(myWriter.sampleWriter())                        
                    .build();
    }
    

    Heads-up: I would recommend using a step-scoped completion policy instead of a job-scoped step. The reason is that, while technically possible, a step should not be scoped. Scoping a step actually does not make sense if you think about it.. it means: do not create that step definition until this step is started at runtime, but to start that step at runtime it needs to be defined upfront 😵‍💫