I want to process multiple files sequentially and each file needs to be processed with the help of multiple threads so used the spring batch FlatFileItemReader and TaskExecutor and it seems to be working fine for me. As mentioned in the requirement we have to process multiple files, so along with FlatFileItemReader, I am using MultiResourceItemReader which will take a number of files and process one by one where I am facing issues. Can someone help me what is the cause of exception? What is the approach to fix it..?
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119)
customer2.csv
200,Zoe,Nelson,1973-01-12 17:19:30
201,Vivian,Love,1951-10-31 08:57:08
202,Charde,Lang,1967-02-23 12:24:26
customer3.csv
400,Amelia,Osborn,1972-05-09 09:21:22
401,Gemma,Finch,1989-09-25 23:00:59
402,Orli,Slater,1959-03-30 15:54:32
403,Donovan,Beasley,1986-06-18 14:50:30
customer4.csv
600,Zelenia,Henson,1982-07-03 03:28:39
601,Thomas,Mathews,1954-11-21 20:34:03
602,Kevyn,Whitney,1984-09-21 06:24:25
603,Marny,Leon,1984-06-10 21:32:09
604,Jarrod,Gay,1960-06-22 19:11:04
customer5.csv
800,Imogene,Lee,1966-10-19 17:53:44
801,Mira,Franks,1964-03-08 09:47:43
802,Silas,Dixon,1953-04-11 01:37:51
803,Paloma,Daniels,1962-06-14 17:01:02
My code:
@Bean
public MultiResourceItemReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
return reader;
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] {"id", "firstName", "lastName", "birthdate"});
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
bellow snippet working fine while using below :
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(customerItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
}
bellow snippet is not working getting above mentioned exception
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100).
reader(multiResourceItemReader())
.writer(customerItemWriter()).taskExecutor(taskExecutor()).throttleLimit(10)
.build();
}
Since you are using the reader in a multi-threaded step, a thread could have closed the current file while another thread is trying to read from that file at the same time. You need to synchronize access to your reader with a SynchronizedItemStreamReader
:
@Bean
public SynchronizedItemStreamReader<Customer> multiResourceItemReader() {
System.out.println("In multiResourceItemReader");
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setDelegate(customerItemReader());
reader.setResources(inputFiles);
SynchronizedItemStreamReader<Customer> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}