I am using Spring Boot + Batch v2.7.1 in my project and looks like there is a bug when Reading from FlatFileItemReader
using ClassifierCompositeItemWriter
and MultiResourceItemWriter
as itemCountLimitPerResource
value doesn't works well and gives wrong responses.
I am reading csv file and splitting into multiple files having max records in every file should be 5 only, but the code which I developed giving me 7 values.
MainApp.java
@EnableBatchProcessing
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MultiResourceSplitApplication {
public static void main(String[] args) {
SpringApplication.run(MultiResourceSplitApplication.class, args);
}
}
MyJob.java
package com.example;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
@Configuration
public class MyJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Employee> itemReader() {
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("empId", "firstName", "lastName", "role");
DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
employeeLineMapper.setLineTokenizer(tokenizer);
employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
employeeLineMapper.afterPropertiesSet();
return new FlatFileItemReaderBuilder<Employee>()
.name("flatFileReader")
.linesToSkip(1)
.resource(new ClassPathResource("employee.csv"))
.lineMapper(employeeLineMapper)
.build();
}
@Bean
public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
javaDeveloperItemWriter(),
pythonDeveloperItemWriter(),
cloudDeveloperItemWriter());
return new ClassifierCompositeItemWriterBuilder<Employee>()
.classifier(classifier)
.build();
}
@Bean
public ItemWriter<Employee> javaDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw1")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("javaDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("javaDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public ItemWriter<Employee> pythonDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw2")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("pythonDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("pythonDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public ItemWriter<Employee> cloudDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw3")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("cloudDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("cloudDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step")
.<Employee, Employee>chunk(3)
.reader(itemReader())
.writer(classifierCompositeItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step())
.build();
}
}
Classifier.java
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import lombok.Setter;
@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
private static final long serialVersionUID = 1L;
private ItemWriter<Employee> javaDeveloperFileItemWriter;
private ItemWriter<Employee> pythonDeveloperFileItemWriter;
private ItemWriter<Employee> cloudDeveloperFileItemWriter;
public EmployeeClassifier() {
}
public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
ItemWriter<Employee> pythonDeveloperFileItemWriter,
ItemWriter<Employee> cloudDeveloperFileItemWriter) {
this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
}
@Override
public ItemWriter<? super Employee> classify(Employee employee) {
if(employee.getRole().equals("Java Developer")){
return javaDeveloperFileItemWriter;
}
else if(employee.getRole().equals("Python Developer")){
return pythonDeveloperFileItemWriter;
}
return cloudDeveloperFileItemWriter;
}
}
Employee.java
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
private String empId;
private String firstName;
private String lastName;
private String role;
@Override
public String toString() {
return empId + "," + firstName + "," + lastName + "," + role;
}
}
EmployeeFieldSetMapper.java
public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
@Override
public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
return Employee.builder()
.empId(fieldSet.readRawString("empId"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.role(fieldSet.readRawString("role"))
.build();
}
}
employee.csv
empId,firstName,lastName,role
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer
Output: javaDeveloper-employee.csv-1
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
javaDeveloper-employee.csv-2
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer
pythonDeveloper-employee.csv-1
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
pythonDeveloper-employee.csv-1
10,Ravi,Doe,Python Developer
Finally, found the issue.
Root cause: Usage of both .<Employee, Employee>chunk(3)
& .itemCountLimitPerResource(5)
were not working together to give the appropriate and expected records in your case.
That's why the POC was not working.
Note: You don't need to have .<Employee, Employee>chunk(3)
as you are using .itemCountLimitPerResource(5)
. itemCountLimitPerResource
method will take care of creating the chunk and put at most 5
records in every file.
Fix/Solution: Make .<Employee, Employee>chunk(0)
in the MyJobConfig
class.
Updated MyJobConfig:
@Configuration
public class MyJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Employee> itemReader() {
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("empId", "firstName", "lastName", "role");
DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
employeeLineMapper.setLineTokenizer(tokenizer);
employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
employeeLineMapper.afterPropertiesSet();
return new FlatFileItemReaderBuilder<Employee>()
.name("flatFileReader")
.linesToSkip(1)
.resource(new ClassPathResource("employee.csv"))
.lineMapper(employeeLineMapper)
.build();
}
@Bean
public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
javaDeveloperItemWriter(),
pythonDeveloperItemWriter(),
cloudDeveloperItemWriter());
return new ClassifierCompositeItemWriterBuilder<Employee>()
.classifier(classifier)
.build();
}
@Bean
public ItemWriter<Employee> javaDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw1")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("javaDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("/users/anisb/bounty-folder/javaDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public ItemWriter<Employee> pythonDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw2")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("pythonDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("/users/anisb/bounty-folder/pythonDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public ItemWriter<Employee> cloudDeveloperItemWriter() {
FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
.lineAggregator(new PassThroughLineAggregator<>())
.name("iw3")
.build();
return new MultiResourceItemWriterBuilder<Employee>()
.name("cloudDeveloperItemWriter")
.delegate(itemWriter)
.resource(new FileSystemResource("/users/anisb/bounty-folder/cloudDeveloper-employee.csv"))
.itemCountLimitPerResource(5)
.resourceSuffixCreator(index -> "-" + index)
.build();
}
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step")
.<Employee, Employee>chunk(0)
.reader(itemReader())
.writer(classifierCompositeItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step())
.build();
}
}
Tested on my local:
javaDeveloper-employee.csv-1
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
javaDeveloper-employee.csv-2:
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer
pythonDeveloper-employee.csv-1:
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
pythonDeveloper-employee.csv-2:
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer