I need to set the chunk-size dynamically in a spring batch job's step which is stored in the database i.e the chunksize needs to be fetched from the database and set into the bean.
My Query is something like:
select CHUNK_SIZE from SOME_TABLE_NAME where ID='some_id_param_value'
Here the value for ID
would come from the job parameters which is set via a request param passed with the request into the Rest Controller
(while triggering the batch job)
I want to fetch this CHUNK_SIZE
from the database and set it dynamically into the job's step.
Our requirement is that the chunksize varies for the step based on the ID value, the details of which are stored in a db table. For example:
ID | CHUNK_SIZE |
---|---|
01 | 1000 |
02 | 2500 |
I know that the beans in a job are set at the configuration time, and the job parameters are passed at the runtime while triggering the job.
EDIT:
The example provided by MahmoudBenHassine uses @JobScope
and accesses the jobParameters in the step bean using @Value("#{jobParameters['id']}")
. I tried implementing a similar approach using the jobExecutionContext
as follows:
Fetched the chunkSize from the db table in the
StepExecutionListener's beforeStep
method and set it in the
ExecutionContext
.
Annotated the step bean with @JobScope
and used
@Value("#{jobExecutionContext['chunk']}")
to access it in the step
bean.
But I face the following error:
Error creating bean with name 'scopedTarget.step' defined in class path resource [com/sample/config/SampleBatchConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'step' threw exception; nested exception is java.lang.NullPointerException
It is not able to access the 'chunk' key-value from the jobExecutionContext
, thus throwing the NullPointerException
.
Does it need to be promoted somehow so that it can be accessed in the step bean? If yes, a quick sample or a direction would be really appreciated.
My Controller class:
@RestController
public class SampleController {
@Autowired
JobLauncher sampleJobLauncher;
@Autowired
Job sampleJob;
@GetMapping("/launch")
public BatchStatus launch(@RequestParam(name = "id", required = true) String id){
Map<String, JobParameter> map = new HashMap<>();
map.put("id", new JobParameter(id));
map.put("timestamp", new JobParameter(System.currentTimeMillis));
JobParameters params = new JobParameters(map);
JobExecution j = sampleJobLauncher.run(sampleJob, params);
return j.getStatus();
}
}
My batch config class(containing job and step bean):
@Configuration
public class SampleBatchConfig{
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private MyRepoClass myRepo; // this class contains the jdbc method to fetch chunksize from the db table
@Autowired
MyReader myReader;
@Autowired
MyWriter myWriter;
@Bean
@JobScope
public Step sampleStep(@Value("#{jobExecutionContext['chunk']}") Integer chunkSize){
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize) //TODO ~instead of hardcoding the chunkSize or getting it from the properties file using @Value, the requirement is to fetch it from the db table using the above mentioned query with id job parameter and set it here
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
int chunk = myRepo.findChunkSize(stepExecution.getJobExecution().getExecutionContext().get("id")); // this method call fetches chunksize from the db table using the id job parameter
stepExecution.getJobExecution().getExecutionContext().put("chunk", chunk);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.build();
}
@Bean
public Job job(){
return myJobBuilderFactory.get("sampleJob")
.incrementer(new RunIdIncrementer())
.start(sampleStep(null))
.build();
}
}
NOTE: The job may have multiple steps with different chunkSizes, and in that case chunkSize is to be fetched separately for each step.
EDIT 2: Changing my step definition as follows works, but there is a problem. Here the reader reads a list having 17 items, in a chunk of size 4.
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.listener(new ChunkListenerSupport() {
@Override
public void afterChunk(ChunkContext context) {
System.out.println("MyJob.afterChunk");
}
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("MyJob.beforeChunk");
}
})
.build();
}
The first time I trigger the job from the url, it works fine and prints the following: (The chunk Size is set to 4 in the db table)
2021-05-03 15:06:44.859 INFO 11924 --- [nio-8081-exec-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
item = 1
item = 2
item = 3
item = 4
MyJob.afterChunk
MyJob.beforeChunk
item = 5
item = 6
item = 7
item = 8
MyJob.afterChunk
MyJob.beforeChunk
item = 9
item = 10
item = 11
item = 12
MyJob.afterChunk
MyJob.beforeChunk
item = 13
item = 14
item = 15
item = 16
MyJob.afterChunk
MyJob.beforeChunk
item = 17
MyJob.afterChunk
But if I trigger the job again, without restarting the server/spring container, the following is printed:
2021-05-03 15:11:02.427 INFO 11924 --- [nio-8081-exec-4] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
MyJob.beforeChunk
MyJob.afterChunk
In Short, it works fine for exactly once, when the server is restarted. But it doesn't work for the subsequent job executions without restarting the server.
Since you pass the ID as a job parameter and you want to get the chunk size dynamically from the database based on that ID while configuring the step, you have two options:
@Bean
@JobScope
public Step sampleStep(@Value("#{jobParameters['id']}") Integer id){
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunksize from the db table using the id job parameter
return myStepBuilderFactory.get("sampleStep")
.<MyClass, MyClass>chunk(chunkSize)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}
@Bean
@StepScope
public SimpleCompletionPolicy chunkCompletionPolicy(@Value("#{jobParameters['id']}") Integer id) {
int chunkSize = myRepo.findChunkSize(id); // this method call fetches chunkSize from the db table using the id job parameter
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager, SimpleCompletionPolicy chunkCompletionPolicy){
return myStepBuilderFactory.get("sampleStep")
.chunk(chunkCompletionPolicy, transactionManager)
.reader(myReader.sampleReader())
.writer(myWriter.sampleWriter())
.build();
}
Heads-up: I would recommend using a step-scoped completion policy instead of a job-scoped step. The reason is that, while technically possible, a step should not be scoped. Scoping a step actually does not make sense if you think about it.. it means: do not create that step definition until this step is started at runtime, but to start that step at runtime it needs to be defined upfront 😵💫