I’m working with a Spring Batch step that processes chunks in parallel. The step uses a custom aggregate ItemReader
that returns a List
of entities to the ItemProcessor.
Each entity contains nested @ElementCollection
lists. These collections are lazily loaded by default.
I’m using a JpaTransactionManager
for transaction management and SimpleAsyncTaskExecutor
for parallel execution.
The issue is that the nested lazy collections are not initialized when accessed in the ItemProcessor
, leading to a LazyInitializationException
.
Example:
CustomerOrder entity:
@Entity
public class CustomerOrder {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
@ManyToOne
@JoinColumn(name = "order_id")
private Order order;
private boolean processed;
// Getters and setters omitted for brevity
}
Order entity:
The @ElementCollection
associations are lazily loaded by default.
@Entity
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ElementCollection
@CollectionTable(name = "order_dates", joinColumns = @JoinColumn(name = "order_id"))
@Column(name = "date")
private List<LocalDate> dates = new ArrayList<>();
@ElementCollection
@CollectionTable(name = "order_sizes", joinColumns = @JoinColumn(name = "order_id"))
@Column(name = "size")
private List<Double> sizes = new ArrayList<>();
// Getters and setters omitted for brevity
}
Item reader (aggregate):
Bean:
@Bean
public AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader(
EntityManagerFactory entityManagerFactory) {
JpaPagingItemReader<CustomerOrder> itemReader = new JpaPagingItemReaderBuilder<CustomerOrder>()
.name("customerOrderItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT co FROM CustomerOrder co")
.pageSize(100)
.build();
return new AggregatePagingItemReader<>(itemReader);
}
Class:
public class AggregatePagingItemReader<T> implements ItemStreamReader<List<T>> {
// Wraps a JpaPagingItemReader to return a full page as List<T> instead of one item at a time.
}
Item processor:
@Component
@StepScope
public class CustomerOrderItemProcessor implements ItemProcessor<List<CustomerOrder>, List<CustomerOrder>> {
@Override
public List<CustomerOrder> process(List<CustomerOrder> customerOrders) throws Exception {
List<CustomerOrder> newCustomerOrders = new ArrayList<>();
for (CustomerOrder customerOrder : customerOrders) {
CustomerOrder newCustomerOrder = new CustomerOrder();
newCustomerOrder.setName(customerOrder.getName());
newCustomerOrder.setProcessed(true);
Order order = customerOrder.getOrder();
Order newOrder = new Order();
newOrder.setDates(order.getDates()); // This line throws LazyInitializationException
newOrder.setSizes(order.getSizes());
newCustomerOrder.setOrder(newOrder);
newCustomerOrders.add(newCustomerOrder);
}
return newCustomerOrders;
}
}
Item writer:
@Component
@StepScope
public class CustomerOrderItemWriter implements ItemWriter<List<CustomerOrder>> {
private final JpaItemWriter<CustomerOrder> jpaItemWriter;
public CustomerOrderItemWriter(EntityManagerFactory entityManagerFactory) {
jpaItemWriter = new JpaItemWriterBuilder<CustomerOrder>()
.entityManagerFactory(entityManagerFactory)
.build();
}
@Override
public void write(Chunk<? extends List<CustomerOrder>> chunk) {
List<CustomerOrder> customerOrders = chunk.getItems().stream().flatMap(List::stream).toList();
jpaItemWriter.write(new Chunk<>(customerOrders));
}
}
Step configuration:
@Configuration
public class CustomerOrderSetpConfig {
@Bean
public Step pocessCustomerOrderStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader,
AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> customerOrderAsyncItemProcessor,
AsyncItemWriter<List<CustomerOrder>> customerOrderAsyncItemWriter) {
return new StepBuilder("pocessCustomerOrderStep", jobRepository)
.<List<CustomerOrder>, Future<List<CustomerOrder>>>chunk(1, transactionManager)
.reader(customerOrderAggregateItemReader)
.processor(customerOrderAsyncItemProcessor)
.writer(customerOrderAsyncItemWriter).build();
}
@Bean
public AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> customerOrderAsyncItemProcessor(
CustomerOrderItemProcessor customerOrderItemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor<List<CustomerOrder>, List<CustomerOrder>> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(customerOrderItemProcessor);
asyncItemProcessor.setTaskExecutor(taskExecutor);
return asyncItemProcessor;
}
@Bean
public AsyncItemWriter<List<CustomerOrder>> customerOrderAsyncItemWriter(
CustomerOrderItemWriter customerOrderItemWriter) {
AsyncItemWriter<List<CustomerOrder>> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customerOrderItemWriter);
return asyncItemWriter;
}
}
What I’ve tried:
LazyInitializationException
when accessing the nested collections in the ItemProcessor
.JOIN FETCH
in the query: Attempting to fetch nested collections directly in the reader query causes a MultipleBagFetchException
.Hibernate.initialize()
in an ItemReadListener
: This loads the collections in afterRead()
, but causes an N+1 query problem.Question:
How can I eagerly initialize nested lazy collections (such as @ElementCollection
) in a Spring Batch step that uses parallel chunk processing and an aggregate ItemReader
without running into LazyInitializationException
, MultipleBagFetchException
, or performance issues like N+1 selects?
Is there a recommended performant way to batch-initialize nested collections in this context, possibly during the read phase, while still keeping the step thread-safe for parallel execution?
After running into common pitfalls with JPA in a parallel Spring Batch step:
LazyInitializationException
when accessing lazy collections in the ItemProcessor
that runs outside the transaction scopeMultipleBagFetchException
when trying to JOIN FETCH
multiple collections in one queryI implemented a custom paging ItemReader
that eagerly loads the required collections using separate batch queries during the read phase.
Here’s how it works:
CustomerOrder
entitiesOrder
IDs@ElementCollection
(Order.dates
, Order.sizes
) in batchpublic class CustomerOrderItemReader extends AbstractPagingItemReader<CustomerOrder> {
private final EntityManagerFactory entityManagerFactory;
private EntityManager entityManager;
public CustomerOrderItemReader(EntityManagerFactory entityManagerFactory, int pageSize) {
super();
this.entityManagerFactory = entityManagerFactory;
setPageSize(pageSize);
setName("customerOrderItemReader");
}
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.state(entityManagerFactory != null, "EntityManager cannot be null");
}
@Override
protected void doOpen() throws Exception {
super.doOpen();
entityManager = entityManagerFactory.createEntityManager();
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
}
}
@Override
protected void doReadPage() {
EntityTransaction tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
if (results == null) {
results = new CopyOnWriteArrayList<>();
} else {
results.clear();
}
// Step 1: Fetch CustomerOrders and their Orders
List<CustomerOrder> customerOrders = entityManager.createQuery("""
SELECT co FROM CustomerOrder co
LEFT JOIN co.order
ORDER BY co.id
""", CustomerOrder.class)
.setFirstResult(getPage() * getPageSize())
.setMaxResults(getPageSize())
.getResultList();
if (customerOrders.isEmpty()) {
tx.commit();
return;
}
// Step 2: Fetch nested Order collections using IDs
List<Long> orderIds = customerOrders.stream()
.map(CustomerOrder::getOrder)
.map(Order::getId)
.distinct()
.toList();
// Step 3.1: Load Order.dates
entityManager.createQuery("""
SELECT DISTINCT o FROM Order o
LEFT JOIN FETCH o.dates
WHERE o.id IN :orderIds
""", Order.class)
.setParameter("orderIds", orderIds)
.getResultList();
// Step 3.2: Load Order.sizes
entityManager.createQuery("""
SELECT DISTINCT o FROM Order o
LEFT JOIN FETCH o.sizes
WHERE o.id IN :orderIds
""", Order.class)
.setParameter("orderIds", orderIds)
.getResultList();
tx.commit();
results.addAll(customerOrders);
}
@Override
protected void doClose() throws Exception {
entityManager.close();
super.doClose();
}
}
Finally, register the CustomerOrderItemReader
as an AggregatePagingItemReader
bean:
@Bean
public AggregatePagingItemReader<CustomerOrder> customerOrderAggregateItemReader(EntityManagerFactory entityManagerFactory) {
CustomerOrderItemReader itemReader = new CustomerOrderItemReader(entityManagerFactory, 100);
return new AggregatePagingItemReader<>(itemReader);
}
This approach:
LazyInitializationException
— everything is loaded in the read phaseMultipleBagFetchException
— fetch collections in separate queriesIN (:ids)