springcompositeitemprocessor

Spring Batch partition doesnt work composite itemprocessor


I have a Spring Batch partition job. I’m using CompositeProcessor, read data from DB and save these items into an CopyOnWriteArrayList. Because the environment is concurrent but my CopyOnWriteArrayList is being utilized for other threads and mix information, I don’t know why and what I am doing bad, and the output writing them into files for each thread.

public class CustomerItemProcessor implements ItemProcessor<beangenerico,CopyOnWriteArrayList<beanCustomer>> {

 private CustomerDAO customerDAO;
 private CopyOnWriteArrayList<beanCustomer> listbean;

 public CopyOnWriteArrayList<beanCustomer> process(beangenerico rangos) throws Exception {

      listbean = customerDAO.getAccAgentes(rangos);

      if(listbean != null) {
          //customer.setId(currentCustomer.getId());
         return listbean;
      }else{
         return null;
         }
    }

The configuration of my batch im XML:

<batch:job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
      <batch:step id="masterStep">
         <batch:partition step="slave" partitioner="rangePartitioner">
                 <batch:handler grid-size="10" task-executor="taskExecutor"/>
         </batch:partition>
      </batch:step>
</batch:job>

<!-- each thread will run this job, with different stepExecutionContext values. -->
<batch:step id="slave" xmlns="http://www.springframework.org/schema/batch">
              <batch:tasklet task-executor="taskExecutor" throttle-limit="1">
                 <batch:chunk reader="beaniniendreader" writer="tempRecordsWriter"  processor="completeItemProcessor" commit-interval="1" />
              </batch:tasklet>
        </batch:step>

        <bean id="taskExecutor"  class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

        <bean id="rangePartitioner" class="my.package.springbatch.RangePartitioner" />  
    <bean id="beaniniendreader" class="my.package.springbatch.FormiikReader" scope="step"></bean>

    <bean id="beanprocessor" class="my.package.springbatch.FormiikProcessor" scope="step">
       <property name="accountExecutiveDao" ref="accountExecutiveDao"/>
    </bean>

    <bean id="beanprocessor2" class="my.package.springbatch.CustomerItemProcessor" scope="step">
       <property name="customerDAO" ref="customerAccDao"/>
    </bean>

    <bean id="completeItemProcessor"  class="org.springframework.batch.item.support.CompositeItemProcessor">
       <property name="delegates">
           <list>
                <ref bean="beanprocessor2"/>
                <ref bean="accItemprocessor"/>
                <ref bean="beanaccDataItem"/>
           </list>
      </property>
    </bean> 

    <bean id="tempRecordsWriter" class="my.package.springbatch.ListDelegateWriter" scope="step">
       <property name="delegate" ref="flatFileItemWriterPartition"/>
    </bean>

    <!-- csv file writer -->
    <bean id="flatFileItemWriterPartition" class="org.springframework.batch.item.file.FlatFileItemWriter"
     scope="step" >
      <property name="resource"
     value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" />
         <property name="appendAllowed" value="false" />
         <property name="lineAggregator">
         <bean  class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
               <property name="delimiter" value="," />
               <property name="fieldExtractor">
               <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
    <property name="names" value="cuenta, name, purchasedPackage" />    
               </bean>
           </property>
        </bean>
    </property>
    </bean>

Solution

  • I going back to take the subject of my code, it is advised to me using Threadlocal for storing thread-specific data whereby it´s works . Here I put my code again. Thanks for your replies.

    public class CustomerItemProcessor implements ItemProcessor<beangenerico,ThreadLocal<CopyOnWriteArrayList<beanCustomer>>> {
        private CustomerDAO customerDAO;
        private ThreadLocal<CopyOnWriteArrayList<beanCustomer>> listbean = new ThreadLocal<CopyOnWriteArrayList<beanCustomer>>();      
    
        public ThreadLocal<CopyOnWriteArrayList<beanCustomer>> process(beangenerico rangos) throws Exception {
    
                listbean.set(new CopyOnWriteArrayList<beanCustomer>());
                listbean = customerDAO.getAccAgentes(rangos);
    
                if(listbean != null) {
    
                    return listbean;
                } else {
                    return null;
                }
    
        }
    
        public void setCustomerDAO(CustomerDAO customerDAO) {
                this.customerDAO = customerDAO;
        }
    

    }