Hello folks,
I am trying to write a spring batch code which will read CSV and write a csv after some operation.
While same thing with simple java program is taking 1 minute performing 20 million data. with spring batch it exceeds 2 minutes.
I thought spring batch would be faster. Is there anything i am doing wrong?
Please find my batch config below...
package com.learning.csv.to.csv.batch.process.batch.config;
import java.io.IOException;
import java.io.Writer;
import javax.annotation.PreDestroy;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.learning.csv.to.csv.batch.process.line.aggregator.ExtendedLineAggregator;
import com.learning.csv.to.csv.batch.process.model.Organization;
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing
public class BatchConfiguration2 extends DefaultBatchConfigurer {
@Value("${input.file.path}")
private String inputFile;
@Value("${output.file.path}")
private String outputFile;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Override
public void setDataSource(DataSource dataSource) {
// override to do not set datasource even if a datasource exist.
// initialize will use a Map based JobRepository (instead of database)
}
@Bean
public Job job() {
return jobBuilderFactory.get("csv-processor").incrementer(new RunIdIncrementer()).start(step())
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("csv-file-load").<Organization, Organization>chunk(50).reader(itemReader())
.writer(itemWriter()).taskExecutor(taskExecutor()).build();
}
@PreDestroy
public void shutdown() throws InterruptedException {
ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutor();
executor.shutdown();
}
@Bean(destroyMethod = "shutdown")
@Primary
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // Set the core pool size
executor.setMaxPoolSize(30); // Set the maximum pool size
executor.setQueueCapacity(20); // Set the queue capacity
executor.setThreadNamePrefix("MyBatch-");
executor.initialize();
return executor;
}
@Bean
public FlatFileItemReader<Organization> itemReader() {
System.out.println("Reading from file...");
FlatFileItemReader<Organization> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new FileSystemResource(inputFile));
flatFileItemReader.setLinesToSkip(1);
flatFileItemReader.setName("CSV-Reader");
flatFileItemReader.setStrict(false);
flatFileItemReader.setLineMapper(lineMapper());
flatFileItemReader.setName(inputFile);
return flatFileItemReader;
}
@Bean
public FlatFileItemWriter<Organization> itemWriter() {
System.out.println("Writing from file...");
FlatFileItemWriter<Organization> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource(outputFile));
flatFileItemWriter.setLineAggregator(new ExtendedLineAggregator());
flatFileItemWriter.setHeaderCallback(new FlatFileHeaderCallback() {
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write(
"Index,OrganizationId,Name,Website,Country,Description,Founded,Industry,Numberofemployees");
}
});
return flatFileItemWriter;
}
@Bean
public LineMapper<Organization> lineMapper() {
DefaultLineMapper<Organization> defaultLineMapper = new DefaultLineMapper<>();
BeanWrapperFieldSetMapper<Organization> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(false);
lineTokenizer.setNames(new String[] { "Index", "OrganizationId", "Name", "Website", "Country", "Description",
"Founded", "Industry", "Numberofemployees" });
fieldSetMapper.setTargetType(Organization.class);
defaultLineMapper.setLineTokenizer(lineTokenizer);
defaultLineMapper.setFieldSetMapper(fieldSetMapper);
return defaultLineMapper;
}
}
While same thing with simple java program is taking 1 minute performing 20 million data. with spring batch it exceeds 2 minutes.
Does the "simple java program" restart where it left off when things go wrong? Does it prevent you from re-processing the same data set if that is not desired? Does it support concurrency or partitioning? The point I am trying to make here is that you are comparing two different things, and hoping to get the same or better performance from something that does more than the other thing.
To put it simply, the "1 minute" difference is the cost of everything that Spring Batch gives you for free (transaction management, state persistence, fault-tolerance, etc) and that the "simple java program" does not.
Now, since you have a working version of your batch job with Spring Batch (ie correctness of the logic), there are many ways to improve its performance:
BeanWrapperFieldSetMapper
uses reflection to map raw data to domain object. Use a custom mapper and you should get a performance boost hereAsyncItemProcessor
/AsyncItemWriter
instead.Those are already good starting points that should improve the performance of your job. If that is not sufficient, there is a second line of improvements that I can share in details (using binary data for items and execution context, partition the input, etc).