I am trying to to create a batch program in Spring that calls my database, through the reader using JdbcCursorItemReader. After the item reader is created, I assume the step takes care of iterating through it. next is dealing with the writer, I want to call the write(Chunk<> chunks) method that I override from ItemWriter<>, but it never calls it. and when I do (Assume I put customReader.read() in the reader() in step builder) I can't do the same with write(Chunk<> chunks) since it takes in a parameter.
I have gotten an implementation to work, where I create two functions in my Configuration file reader() and writer(), where reader calls the read() in customReader returning an item reader. Then I iterate through it, adding each entry into a Chunk(List) I created outside the methods. Then I would return the reader when it became null. Then when writer was called in the writer part of the step builder, I called the method in my config again, but this time the writer one. It would take that Chunk(List) I made outside the methods, and use it as a parameter to the write() in my customWriter class. then I would just return my customWriter object I injected. This was inspired by what I saw in the documentation.
https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html
But this, from what I have heard is not how it should be done, since that's springs job to do, and I could confuse it when the job gets bigger or something like that. Something about the .chunk(chunkSize,transactionManager) step, still not sure.
I was hoping for some guidance in this problem I have. If you would like more information, do not hesitate to ask.
Edit: Providing Code of working case that might not be best practice
Reader:
public JdbcCursorItemReader<MyEntity> itemReader() throws Exception {
System.out.println("We are in the database reader");
PreparedStatementSetter preparedStatementSetter=new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
//code...
}
};
Chunk<MyEntity> items=new Chunk();
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setPreparedStatementSetter(preparedStatementSetter);
itemReader.setName("name");
itemReader.setSql("SQL"); //assume there is code in there and it works
itemReader.setRowMapper(new CustomRowMapper());
return itemReader;
}
Writer:
@Override
public void write(Chunk<? extends MyEntity> chunk) throws Exception {
NamedParameterJdbcOperations namedParameterJdbcTemplate=new NamedParameterJdbcTemplate(dataSource);
System.out.println("Hello!");
chunk.forEach(myEntity ->
namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("SQL", new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
// set values on sql
}
@Override
public int getBatchSize() {
return chunk.size(); // or any other value you want
}
})
);
chunk.forEach(myEntity ->
namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("SQL" +
"VALUES (?,?,?,?,true)", new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
// set values on sql
}
@Override
public int getBatchSize() {
return chunk.size(); // or any other value you want
}
})
);
}
Batch Config:
@Configuration
@EnableAutoConfiguration
@ComponentScan
@EnableBatchProcessing
public class Config {
private CustomReader customReader;
private CustomWriter customWriter;
private DataSource dataSource;
@Autowired
public Config(DataSource dataSource,CustomReader customReader,CustomWriter customWriter){
this.customReader=customReader;
this.customWriter=customWriter;
this.dataSource=dataSource;
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
Chunk<myEntity> chunks;
public ItemReader<myEntity> reader() throws Exception {
chunks=new Chunk<>();
JdbcCursorItemReader itemReader=customReader.itemReader();
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object myEntity = new Object();
while(myEntity != null){
myEntity = itemReader.read();
if(myEntity==null) break;
chunks.add(myEntity);
System.out.println(myEntity);
}
return itemReader;
}
public ItemWriter<myEntity> writer() throws Exception {
consoleItemWriter.write(chunks);
return consoleItemWriter;
}
@Bean
public Job myEntityJob(JobRepository jobRepository, Step myEntityStep){
return new JobBuilder("myEntityJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(myEntityStep)
.build();
}
@Bean
public Step myEntityStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
return new StepBuilder("myEntityStep", jobRepository)
.<MyEntity,MyEntity>chunk(10,transactionManager)
.reader(reader(databaseReader))
.writer(writer(consoleItemWriter))
.build();
}
public static void main(String[] args) throws Exception {
// System.exit is common for Batch applications since the exit code can be used to
// drive a workflow
System.exit(SpringApplication.exit(SpringApplication.run(
Config.class, args)));
}
}
You should not try to manually read and write your chunks. SpringBatch handle things in chunks or mapper oriented way.
You can try the following implementation :
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
@Configuration
public class BatchConfiguration {
@Bean
public JdbcCursorItemReader<MyEntity> itemReader(DriverManagerDataSource dataSource) {
JdbcCursorItemReader<MyEntity> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT * FROM my_table"); // Replace with your SQL query
itemReader.setRowMapper(new CustomRowMapper());
return itemReader;
}
}
For the writer :
import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.stereotype.Component;
@Component
public class CustomWriter implements ItemWriter<MyEntity> {
private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
public CustomWriter(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
}
@Override
public void write(List<? extends MyEntity> items) throws Exception {
namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("INSERT INTO my_table (column1, column2) VALUES (?, ?)", new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
// here replace with the columns you want insert
MyEntity myEntity = items.get(i);
ps.setString(1, myEntity.getColumn1());
ps.setString(2, myEntity.getColumn2());
}
@Override
public int getBatchSize() {
return items.size();
}
});
}
}
Now for the step config :
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class StepConfiguration {
@Autowired
private JdbcCursorItemReader<MyEntity> itemReader;
@Autowired
private CustomWriter itemWriter;
@Bean
public Step myEntityStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("myEntityStep", jobRepository)
.<MyEntity, MyEntity>chunk(10, transactionManager)
.reader(itemReader)
.writer(itemWriter)
.build();
}
}