javaspringhibernatejpaspring-batch

Eagerly load nested lazy collections in Spring Batch with parallel chunk processing and an aggregate ItemReader


I’m working with a Spring Batch step that processes chunks in parallel. The step uses a custom aggregate ItemReader that returns a List of entities to the ItemProcessor.

Each entity contains nested @ElementCollection lists. These collections are lazily loaded by default.

I’m using a JpaTransactionManager for transaction management and SimpleAsyncTaskExecutor for parallel execution.

The issue is that the nested lazy collections are not initialized when accessed in the ItemProcessor, leading to a LazyInitializationException.

Example:

CustomerOrder entity:

@Entity
public class CustomerOrder {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String name;

    @ManyToOne
    @JoinColumn(name = "order_id")
    private Order order;

    private boolean processed;

    // Getters and setters omitted for brevity
}

Order entity:

The @ElementCollection associations are lazily loaded by default.

@Entity
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @ElementCollection
    @CollectionTable(name = "order_dates", joinColumns = @JoinColumn(name = "order_id"))
    @Column(name = "date")
    private List<LocalDate> dates = new ArrayList<>();

    @ElementCollection
    @CollectionTable(name = "order_sizes", joinColumns = @JoinColumn(name = "order_id"))
    @Column(name = "size")
    private List<Double> sizes = new ArrayList<>();

    // Getters and setters omitted for brevity
}

Item reader (aggregate):

Bean:

@Bean
public AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader(
        EntityManagerFactory entityManagerFactory) {
    JpaPagingItemReader<CustomerOrder> itemReader = new JpaPagingItemReaderBuilder<CustomerOrder>()
            .name("customerOrderItemReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT co FROM CustomerOrder co")
            .pageSize(100)
            .build();
    return new AggregatePagingItemReader<>(itemReader);
}

Class:

public class AggregatePagingItemReader<T> implements ItemStreamReader<List<T>> {
    // Wraps a JpaPagingItemReader to return a full page as List<T> instead of one item at a time.
}

Item processor:

@Component
@StepScope
public class CustomerOrderItemProcessor implements ItemProcessor<List<CustomerOrder>, List<CustomerOrder>> {

    @Override
    public List<CustomerOrder> process(List<CustomerOrder> customerOrders) throws Exception {
        List<CustomerOrder> newCustomerOrders = new ArrayList<>();
        for (CustomerOrder customerOrder : customerOrders) {
            CustomerOrder newCustomerOrder = new CustomerOrder();
            newCustomerOrder.setName(customerOrder.getName());
            newCustomerOrder.setProcessed(true);

            Order order = customerOrder.getOrder();

            Order newOrder = new Order();
            newOrder.setDates(order.getDates()); // This line throws LazyInitializationException
            newOrder.setSizes(order.getSizes());

            newCustomerOrder.setOrder(newOrder);

            newCustomerOrders.add(newCustomerOrder);
        }
        return newCustomerOrders;
    }
}

Item writer:

@Component
@StepScope
public class CustomerOrderItemWriter implements ItemWriter<List<CustomerOrder>> {

    private final JpaItemWriter<CustomerOrder> jpaItemWriter;

    public CustomerOrderItemWriter(EntityManagerFactory entityManagerFactory) {
        jpaItemWriter = new JpaItemWriterBuilder<CustomerOrder>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Override
    public void write(Chunk<? extends List<CustomerOrder>> chunk) {
        List<CustomerOrder> customerOrders = chunk.getItems().stream().flatMap(List::stream).toList();
        jpaItemWriter.write(new Chunk<>(customerOrders));
    }
}

Step configuration:

@Configuration
public class CustomerOrderSetpConfig {

    @Bean
    public Step pocessCustomerOrderStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
            AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader,
            AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> customerOrderAsyncItemProcessor,
            AsyncItemWriter<List<CustomerOrder>> customerOrderAsyncItemWriter) {
        return new StepBuilder("pocessCustomerOrderStep", jobRepository)
                .<List<CustomerOrder>, Future<List<CustomerOrder>>>chunk(1, transactionManager)
                .reader(customerOrderAggregateItemReader)
                .processor(customerOrderAsyncItemProcessor)
                .writer(customerOrderAsyncItemWriter).build();
    }

    @Bean
    public AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> customerOrderAsyncItemProcessor(
            CustomerOrderItemProcessor customerOrderItemProcessor, TaskExecutor taskExecutor) {
        AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(customerOrderItemProcessor);
        asyncItemProcessor.setTaskExecutor(taskExecutor);
        return asyncItemProcessor;
    }

    @Bean
    public AsyncItemWriter<List<CustomerOrder>> customerOrderAsyncItemWriter(
            CustomerOrderItemWriter customerOrderItemWriter) {
        AsyncItemWriter<List<CustomerOrder>> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(customerOrderItemWriter);
        return asyncItemWriter;
    }
}

What I’ve tried:

  1. Default approach: Leads to LazyInitializationException when accessing the nested collections in the ItemProcessor.
  2. Using JOIN FETCH in the query: Attempting to fetch nested collections directly in the reader query causes a MultipleBagFetchException.
  3. Using Hibernate.initialize() in an ItemReadListener: This loads the collections in afterRead(), but causes an N+1 query problem.

Question:

How can I eagerly initialize nested lazy collections (such as @ElementCollection) in a Spring Batch step that uses parallel chunk processing and an aggregate ItemReader without running into LazyInitializationException, MultipleBagFetchException, or performance issues like N+1 selects?

Is there a recommended performant way to batch-initialize nested collections in this context, possibly during the read phase, while still keeping the step thread-safe for parallel execution?


Solution

  • After running into common pitfalls with JPA in a parallel Spring Batch step:

    I implemented a custom paging ItemReader that eagerly loads the required collections using separate batch queries during the read phase.

    Here’s how it works:

    1. Loads a page of CustomerOrder entities
    2. Extracts related Order IDs
    3. Uses additional queries to preload @ElementCollection (Order.dates, Order.sizes) in batch
    public class CustomerOrderItemReader extends AbstractPagingItemReader<CustomerOrder> {
    
        private final EntityManagerFactory entityManagerFactory;
    
        private EntityManager entityManager;
    
        public CustomerOrderItemReader(EntityManagerFactory entityManagerFactory, int pageSize) {
            super();
            this.entityManagerFactory = entityManagerFactory;
            setPageSize(pageSize);
            setName("customerOrderItemReader");
        }
        
        @Override
        public void afterPropertiesSet() throws Exception {
            super.afterPropertiesSet();
            Assert.state(entityManagerFactory != null, "EntityManager cannot be null");
        }
    
        @Override
        protected void doOpen() throws Exception {
            super.doOpen();
            entityManager = entityManagerFactory.createEntityManager();
            if (entityManager == null) {
                throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
            }
        }
    
        @Override
        protected void doReadPage() {
            EntityTransaction tx = entityManager.getTransaction();
            tx.begin();
    
            entityManager.flush();
            entityManager.clear();
    
            if (results == null) {
                results = new CopyOnWriteArrayList<>();
            } else {
                results.clear();
            }
    
            // Step 1: Fetch CustomerOrders and their Orders
            List<CustomerOrder> customerOrders = entityManager.createQuery("""
                    SELECT co FROM CustomerOrder co
                    LEFT JOIN co.order
                    ORDER BY co.id
                """, CustomerOrder.class)
                .setFirstResult(getPage() * getPageSize())
                .setMaxResults(getPageSize())
                .getResultList();
    
            if (customerOrders.isEmpty()) {
                tx.commit();
                return;
            }
    
            // Step 2: Fetch nested Order collections using IDs
            List<Long> orderIds = customerOrders.stream()
                .map(CustomerOrder::getOrder)
                .map(Order::getId)
                .distinct()
                .toList();
    
            // Step 3.1: Load Order.dates
            entityManager.createQuery("""
                    SELECT DISTINCT o FROM Order o
                    LEFT JOIN FETCH o.dates
                    WHERE o.id IN :orderIds
                """, Order.class)
                .setParameter("orderIds", orderIds)
                .getResultList();
    
            // Step 3.2: Load Order.sizes
            entityManager.createQuery("""
                    SELECT DISTINCT o FROM Order o
                    LEFT JOIN FETCH o.sizes
                    WHERE o.id IN :orderIds
                """, Order.class)
                .setParameter("orderIds", orderIds)
                .getResultList();
    
            tx.commit();
    
            results.addAll(customerOrders);
        }
    
        @Override
        protected void doClose() throws Exception {
            entityManager.close();
            super.doClose();
        }
    }
    

    Finally, register the CustomerOrderItemReader as an AggregatePagingItemReader bean:

    @Bean
    public AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader(EntityManagerFactory entityManagerFactory) {
        CustomerOrderItemReader itemReader = new CustomerOrderItemReader(entityManagerFactory, 100);
        return new AggregatePagingItemReader<>(itemReader);
    }
    

    This approach: