springspring-batch

org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3


I'm using Spring Boot (v2.7.1) + Batch in my project using the XML appraoch. In this example I'm reading FlatFile using FlatFileItemReaderthen making the use of ClassifierCompositeItemWriter to classify items and with the help of MultiResourceItemWriter create multiple versions of the file when certain threashold is reached.

Below is the error message-

Error:

org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (1), where current version is 3
    at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:110) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:204) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.21.jar:5.3.21]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.21.jar:5.3.21]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.21.jar:5.3.21]
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.21.jar:5.3.21]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.21.jar:5.3.21]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.21.jar:5.3.21]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.21.jar:5.3.21]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.21.jar:5.3.21]
    at com.sun.proxy.$Proxy44.update(Unknown Source) ~[na:na]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:457) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.21.jar:5.3.21]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.6.jar:4.3.6]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.6.jar:4.3.6]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Code. spring-batch-context.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/batch  http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

    <!-- JobRepository and JobLauncher are configuration/setup classes -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>

    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="4"/>
        <property name="maxPoolSize" value="4"/>
    </bean>

    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <bean id="suffixCreator" class="com.example.EmployeeResourceSuffixCreator"/>

    <batch:job id="employeeJob">
        <batch:step id="step1">
            <batch:tasklet transaction-manager="transactionManager" task-executor="taskExecutor">
                <batch:chunk reader="flatFileItemReader" writer="classifierCompositeWriter" commit-interval="20"  >
                    <batch:streams>
                        <batch:stream ref="javaSyncSW"/>
                        <batch:stream ref="pythonSyncSW"/>
                        <batch:stream ref="cloudSyncSW"/>
                    </batch:streams>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>


    <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="classpath:employee.csv"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="empId,firstName,lastName,role"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="com.example.EmployeeFieldSetMapper"/>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="classifierCompositeWriter" class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
        <property name="classifier" ref="employeeClassifier"/>
    </bean>

    <bean id="employeeClassifier" class="com.example.EmployeeClassifier">
        <constructor-arg index="0" ref="javaSyncSW"/>
        <constructor-arg index="1" ref="pythonSyncSW"/>
        <constructor-arg index="2" ref="cloudSyncSW"/>
    </bean>


    <bean id="javaWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:javaDeveloper-employee.csv"/>
        <property name="shouldDeleteIfExists" value="true"/>
        <property name="shouldDeleteIfEmpty" value="true"/>
        <property name="appendAllowed" value="true"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="delimiter" value=","/>
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="empId,firstName,lastName,role"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="javaMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
        <property name="name" value="javaMulti"/>
        <property name="resource" value="file:javaDeveloper-employee.csv"/>
        <property name="itemCountLimitPerResource" value="5"/>
        <property name="resourceSuffixCreator" ref="suffixCreator"/>
        <property name="delegate" ref="javaWriter"/>
    </bean>
    
    
    <bean id="javaSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
        <property name="delegate" ref="javaMultiResource" />
    </bean>
    


    <bean id="pythonWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:pythonDeveloper-employee.csv"/>
        <property name="shouldDeleteIfExists" value="true"/>
        <property name="shouldDeleteIfEmpty" value="true"/>
        <property name="appendAllowed" value="true"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="delimiter" value=","/>
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="empId,firstName,lastName,role"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="pythonMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
        <property name="name" value="javaMulti"/>
        <property name="resource" value="file:pythonDeveloper-employee.csv"/>
        <property name="itemCountLimitPerResource" value="5"/>
        <property name="resourceSuffixCreator" ref="suffixCreator"/>
        <property name="delegate" ref="pythonWriter"/>
    </bean>
    
    <bean id="pythonSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
        <property name="delegate" ref="pythonMultiResource" />
    </bean>


    <bean id="cloudWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:cloudDeveloper-employee.csv"/>
        <property name="shouldDeleteIfExists" value="true"/>
        <property name="shouldDeleteIfEmpty" value="true"/>
        <property name="appendAllowed" value="true"/>
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="delimiter" value=","/>
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                        <property name="names" value="empId,firstName,lastName,role"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="cloudMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
        <property name="name" value="javaMulti"/>
        <property name="resource" value="file:cloudDeveloper-employee.csv"/>
        <property name="itemCountLimitPerResource" value="5"/>
        <property name="resourceSuffixCreator" ref="suffixCreator"/>
        <property name="delegate" ref="cloudWriter"/>
    </bean>
    
     <bean id="cloudSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
        <property name="delegate" ref="cloudMultiResource" />
    </bean>
</beans>    

Employee.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
    private String empId;
    private String firstName;
    private String lastName;
    private String role;

    @Override
    public String toString() {
        return empId + "," + firstName + "," + lastName + "," + role;
    }
}

EmployeeClassifier.java

@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
    private static final long serialVersionUID = 1L;
    private ItemWriter<Employee> javaDeveloperFileItemWriter;
    private ItemWriter<Employee> pythonDeveloperFileItemWriter;
    private ItemWriter<Employee> cloudDeveloperFileItemWriter;
    
    public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
                              ItemWriter<Employee> pythonDeveloperFileItemWriter,
                              ItemWriter<Employee> cloudDeveloperFileItemWriter) {
        this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
        this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
        this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
    }

    @Override
    public ItemWriter<? super Employee> classify(Employee employee) {
        if(employee.getRole().equals("Java Developer")){
            return javaDeveloperFileItemWriter;
        }
        else if(employee.getRole().equals("Python Developer")){
            return pythonDeveloperFileItemWriter;
        }
        return cloudDeveloperFileItemWriter;
    }
}

EmployeeFieldSetMapper.java

public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
    @Override
    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        return Employee.builder()
                .empId(fieldSet.readRawString("empId"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .role(fieldSet.readRawString("role"))
                .build();
    }
}

MainApp.java

@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchMultiresourceClassifierXmlApplication implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchMultiresourceClassifierXmlApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context.xml");

        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("employeeJob");

        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Exit Status : "+ execution.getStatus());

        } catch (JobExecutionException e) {
            System.out.println("Job ExamResult failed");
            e.printStackTrace();
        }
    }
}

Solution

  • The FlatFileItemReader is not thread safe (see its javadoc) and you are using it in a multi-threaded step. This is why threads are overlapping and you get the optimistic locking failure.

    You need to synchronize access to the reader and writer for this to work correctly. One way of doing that is to wrap the reader/writer in a SynchronizedItemStreamReader/SynchronizedItemStreamWriter.