springspring-bootspring-batchspring-batch-stream

Spring Batch Stream restart issue?


I am developing Spring Boot + Batch Stream example. This is very basic example and giving erorr right after the 48th record? Any pointers? I've posted whole code.

Error:

java.lang.RuntimeException: The Ansdwer to the Ultimate Question of Life, the universe & everthing...
    at com.example.config.StatefullItemReader.read(StatefullItemReader.java:55) ~[classes/:na]
    at com.example.config.StatefullItemReader.read(StatefullItemReader.java:1) ~[classes/:na]
    at com.example.config.StatefullItemReader$$FastClassBySpringCGLIB$$7a56e63b.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) ~[spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at com.example.config.StatefullItemReader$$EnhancerBySpringCGLIB$$7b5c8b77.read(<generated>) ~[classes/:na]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203) ~[spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_162]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_162]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_162]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_162]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) [spring-batch-core-4.1.0.RELEASE.jar:4.1.0.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at com.sun.proxy.$Proxy43.run(Unknown Source) [na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:214) [spring-boot-autoconfigure-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:186) [spring-boot-autoconfigure-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:172) [spring-boot-autoconfigure-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:166) [spring-boot-autoconfigure-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at com.example.ItemStreamApplication.main(ItemStreamApplication.java:12) [classes/:na]

2018-12-08 16:18:01.218  INFO 14788 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=statefulJob]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [FAILED]
2018-12-08 16:18:01.220  INFO 14788 --- [       Thread-3] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2018-12-08 16:18:01.222  INFO 14788 --- [       Thread-3] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

The output:

2018-12-08 16:18:01.131  INFO 14788 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: [--spring.output.ansi.enabled=always]
2018-12-08 16:18:01.170  INFO 14788 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=statefulJob]] launched with the following parameters: [{-spring.output.ansi.enabled=always}]
2018-12-08 16:18:01.181  INFO 14788 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
2018-12-08 16:18:01.214 ERROR 14788 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step step1 in job statefulJob

java.lang.RuntimeException: The Ansdwer to the Ultimate Question of Life, the universe & everthing...
    at com.example.config.StatefullItemReader.read(StatefullItemReader.java:55) ~[classes/:na]

StatefullItemReader.java

public class StatefullItemReader implements ItemStreamReader<String> {

    private final List<String> items;
    private int curIndex = -1;
    private boolean restart = false;

    public StatefullItemReader(List<String> items) {
        this.items = items;
        this.curIndex = 0;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey("curIndex")) {
            this.curIndex = executionContext.getInt("curIndex");
            this.restart = true;
        } else {
            this.curIndex = 0;
            executionContext.put("curIndex", this.curIndex);
        }

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("curIndex", this.curIndex);
    }

    @Override
    public void close() throws ItemStreamException {

    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        String item = null;

        if (this.curIndex < this.items.size()) {
            item = this.items.get(this.curIndex);
            this.curIndex++;
        }

        if (this.curIndex == 42 && !restart) {
            throw new RuntimeException("The Ansdwer to the Ultimate Question of Life, the universe & everthing...");
        }

        return item;
    }
}

JobConfig.java

@Configuration
public class JobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    @StepScope
    public StatefullItemReader itemReader() {
        List<String> items = new ArrayList<>(100);

        for (int i = 1; i <= 100; i++) {
            items.add(String.valueOf(i));
        }

        return new StatefullItemReader(items);
    }


    @Bean 
    public ItemWriter<String> itemWriter(){
        return new ItemWriter<String>() {

            @Override
            public void write(List<? extends String> items) throws Exception {
                for (String item : items) {
                    System.out.println(">> "+item);
                }
            }
        };
    }


    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<String, String> chunk(10)
                .reader(itemReader())
                .writer(itemWriter())
                .stream(itemReader())
                .build();
    }

    @Bean
    public Job statefulJob() {
        return jobBuilderFactory.get("statefulJob")
                .start(step1())
                .build();
    }
}

Even If I run the code more than 1st time, it still gives the same error. In case of 2nd time run, I was expecting the code to be run fine.

ItemStreamApplication.java

@SpringBootApplication
@EnableBatchProcessing
public class ItemStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(ItemStreamApplication.class, args);
    }
}

Solution

  • You cannot use H2 in-memory because once the job is finished the database state is lost and if you start again the job is new and not existing.

    You must configure a "real" database like:

    spring.datasource.url=jdbc:postgresql://localhost:5432/batch
    spring.datasource.username=batch
    spring.datasource.password=batch
    spring.datasource.driver-class-name=org.postgresql.Driver
    
    spring.batch.initialize-schema=always