javaspringspring-batch

How do I get my customReader's method to transfer chunks over to my customerWriters write(Chunk<> chunk) method in a Spring Batch step?


I am trying to to create a batch program in Spring that calls my database, through the reader using JdbcCursorItemReader. After the item reader is created, I assume the step takes care of iterating through it. next is dealing with the writer, I want to call the write(Chunk<> chunks) method that I override from ItemWriter<>, but it never calls it. and when I do (Assume I put customReader.read() in the reader() in step builder) I can't do the same with write(Chunk<> chunks) since it takes in a parameter.

I have gotten an implementation to work, where I create two functions in my Configuration file reader() and writer(), where reader calls the read() in customReader returning an item reader. Then I iterate through it, adding each entry into a Chunk(List) I created outside the methods. Then I would return the reader when it became null. Then when writer was called in the writer part of the step builder, I called the method in my config again, but this time the writer one. It would take that Chunk(List) I made outside the methods, and use it as a parameter to the write() in my customWriter class. then I would just return my customWriter object I injected. This was inspired by what I saw in the documentation.

https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html

But this, from what I have heard is not how it should be done, since that's springs job to do, and I could confuse it when the job gets bigger or something like that. Something about the .chunk(chunkSize,transactionManager) step, still not sure.

I was hoping for some guidance in this problem I have. If you would like more information, do not hesitate to ask.

Edit: Providing Code of working case that might not be best practice

Reader:

public JdbcCursorItemReader<MyEntity> itemReader() throws Exception {
    System.out.println("We are in the database reader");


    PreparedStatementSetter preparedStatementSetter=new PreparedStatementSetter() {
        @Override
        public void setValues(PreparedStatement ps) throws SQLException {
            //code...
        }
    };

    Chunk<MyEntity> items=new Chunk();

    JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
    itemReader.setDataSource(dataSource);
    itemReader.setPreparedStatementSetter(preparedStatementSetter);
    itemReader.setName("name");
    itemReader.setSql("SQL"); //assume there is code in there and it works
    itemReader.setRowMapper(new CustomRowMapper());

    return itemReader;
}

Writer:

@Override
public void write(Chunk<? extends MyEntity> chunk) throws Exception {
    NamedParameterJdbcOperations namedParameterJdbcTemplate=new NamedParameterJdbcTemplate(dataSource);
    System.out.println("Hello!");


    chunk.forEach(myEntity ->

        namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("SQL", new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                // set values on sql
            }

            @Override
            public int getBatchSize() {
                return chunk.size(); // or any other value you want
            }
        })

    );
    chunk.forEach(myEntity ->
            namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("SQL" +
                    "VALUES (?,?,?,?,true)", new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    // set values on sql
                }

                @Override
                public int getBatchSize() {
                    return chunk.size(); // or any other value you want
                }
            })
    );
}

Batch Config:

@Configuration
@EnableAutoConfiguration
@ComponentScan
@EnableBatchProcessing
public class Config {

    private CustomReader customReader;

    private CustomWriter customWriter;

    private DataSource dataSource;

    @Autowired
    public Config(DataSource dataSource,CustomReader customReader,CustomWriter customWriter){
        this.customReader=customReader;
        this.customWriter=customWriter;
        this.dataSource=dataSource;
    }


    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }




    Chunk<myEntity> chunks;
    public ItemReader<myEntity> reader() throws Exception {
        chunks=new Chunk<>();
        JdbcCursorItemReader itemReader=customReader.itemReader();
        ExecutionContext executionContext = new ExecutionContext();
        itemReader.open(executionContext);
        Object myEntity = new Object();
        while(myEntity != null){
            myEntity = itemReader.read();
            if(myEntity==null) break;
            chunks.add(myEntity);
            System.out.println(myEntity);
        }
        return itemReader;
    }
    
    public ItemWriter<myEntity> writer() throws Exception {
        consoleItemWriter.write(chunks);
        return consoleItemWriter;
    }
    @Bean
    public Job myEntityJob(JobRepository jobRepository, Step myEntityStep){
        return new JobBuilder("myEntityJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(myEntityStep)
                .build();
    }



    @Bean
    public Step myEntityStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        return new StepBuilder("myEntityStep", jobRepository)
                .<MyEntity,MyEntity>chunk(10,transactionManager)
                .reader(reader(databaseReader))
                .writer(writer(consoleItemWriter))
                .build();
    }




    public static void main(String[] args) throws Exception {
        // System.exit is common for Batch applications since the exit code can be used to
        // drive a workflow
        System.exit(SpringApplication.exit(SpringApplication.run(
                Config.class, args)));
    }
}

Solution

  • You should not try to manually read and write your chunks. SpringBatch handle things in chunks or mapper oriented way.

    You can try the following implementation :

    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.PreparedStatementSetter;
    import org.springframework.jdbc.datasource.DriverManagerDataSource;
    
    @Configuration
    public class BatchConfiguration {
    
        @Bean
        public JdbcCursorItemReader<MyEntity> itemReader(DriverManagerDataSource dataSource) {
            JdbcCursorItemReader<MyEntity> itemReader = new JdbcCursorItemReader<>();
            itemReader.setDataSource(dataSource);
            itemReader.setSql("SELECT * FROM my_table"); // Replace with your SQL query
            itemReader.setRowMapper(new CustomRowMapper());
    
            return itemReader;
        }
    }
    

    For the writer :

    import org.springframework.batch.item.ItemWriter;
    import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
    import org.springframework.jdbc.core.BatchPreparedStatementSetter;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CustomWriter implements ItemWriter<MyEntity> {
    
        private final NamedParameterJdbcTemplate namedParameterJdbcTemplate;
    
        public CustomWriter(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
            this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
        }
    
        @Override
        public void write(List<? extends MyEntity> items) throws Exception {
            namedParameterJdbcTemplate.getJdbcOperations().batchUpdate("INSERT INTO my_table (column1, column2) VALUES (?, ?)", new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                   // here replace with the columns you want insert
                    MyEntity myEntity = items.get(i);
                    ps.setString(1, myEntity.getColumn1());
                    ps.setString(2, myEntity.getColumn2());
                }
    
                @Override
                public int getBatchSize() {
                    return items.size();
                }
            });
        }
    }
    

    Now for the step config :

    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.transaction.PlatformTransactionManager;
    
    @Configuration
    public class StepConfiguration {
    
    @Autowired
    private JdbcCursorItemReader<MyEntity> itemReader;
    
     @Autowired
     private CustomWriter itemWriter;
    
        @Bean
            public Step myEntityStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
            return new StepBuilder("myEntityStep", jobRepository)
                    .<MyEntity, MyEntity>chunk(10, transactionManager)
                    .reader(itemReader)
                    .writer(itemWriter)
                .build();
        }
    }