I'm building a Spring Batch job using Spring Boot 3.3 to process a large dataset of customer transactions (e.g., ~10M records). The dataset has varying density (e.g., some customers have thousands of transactions, others have few), and I want to adjust the chunk size dynamically at runtime to optimize performance.
I've tried to create a custom ItemReader
that calculates a dynamic chunk size based on the data it reads (simplified here for brevity).
@Component
public class DynamicChunkItemReader implements ItemReader<Transaction> {
private final ItemReader<Transaction> delegate;
private volatile int chunkSize = 1000; // Initial chunk size
private final Map<String, Long> keyCounts = new ConcurrentHashMap<>();
public DynamicChunkItemReader(ItemReader<Transaction> delegate) {
this.delegate = delegate;
}
@Override
public Transaction read() throws Exception {
Transaction item = delegate.read();
if (item != null) {
// Track data skew (e.g., transactions per customer)
String customerId = item.getCustomerId();
keyCounts.merge(customerId, 1L, Long::sum);
// Simplified skew calculation
double variance = calculateVariance(keyCounts);
chunkSize = calculateChunkSize(variance);
}
return item;
}
private double calculateVariance(Map<String, Long> counts) {
double mean = counts.values().stream().mapToLong(Long::longValue).average().orElse(0);
return counts.values().stream()
.mapToDouble(count -> Math.pow(count - mean, 2))
.average()
.orElse(0);
}
private int calculateChunkSize(double variance) {
if (variance > 1000) return Math.max(100, chunkSize / 2); // Smaller for dense data
if (variance < 100) return Math.min(5000, chunkSize * 2); // Larger for sparse data
return chunkSize;
}
public int getChunkSize() {
return chunkSize;
}
}
Step configuration:
@Bean
public Step dynamicChunkStep(DynamicChunkItemReader reader, ItemWriter<Transaction> writer, PlatformTransactionManager txManager) {
return stepBuilderFactory.get("dynamicChunkStep")
.<Transaction, Transaction>chunk(reader.getChunkSize()) // Issue: Static value
.reader(reader)
.writer(writer)
.transactionManager(txManager)
.build();
}
The problem is that reader.getChunkSize()
is called once during step configuration, and subsequent changes to chunkSize in the reader are ignored. I need the step to use the updated chunkSize
value for each chunk during execution.
How can I configure a Spring Batch step to apply a dynamic chunk size from ItemReader.getChunkSize()
at runtime without restarting the step?
Is there a way to customize Spring Batch’s ChunkProvider
or TaskletStep
to use a dynamic chunk size while preserving transaction integrity?
to address the issue of applying a dynamic chunk size from an ItemReader’s getChunkSize() method in Spring Batch at runtime without restarting the step, I’ll provide a detailed solution. The core challenge is that Spring Batch’s StepBuilder.chunk(int size) method sets a fixed chunk size at configuration time, but we need the step to use a dynamically updated chunk size determined by the ItemReader during execution.
Below are the implementation procedures that depict the solution in the form of code :
-> integrates the DynamicChunkProvider and bypasses the static chunk() size.
public class DynamicTaskletStep extends TaskletStep {
public DynamicTaskletStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ItemReader<?> reader, ItemProcessor<?, ?> processor, ItemWriter<?> writer) {
super(jobRepository, transactionManager);
setChunkProvider(new DynamicChunkProvider(reader, processor, writer));
setRepeatTemplate(new RepeatTemplate());
}
}
->custom provider overrides the default chunk-building logic to fetch the chunk size from the ItemReader dynamically.
public class DynamicChunkProvider extends SimpleChunkProvider {
private final ItemReader<?> reader;
public DynamicChunkProvider(ItemReader<?> reader, ItemProcessor<?, ?> processor, ItemWriter<?> writer) {
super(processor, writer);
this.reader = reader;
}
@Override
protected Chunk doProvide(int chunkSize, TaskExecutor taskExecutor) throws Exception {
// Fetch dynamic chunk size from reader
int dynamicChunkSize = (reader instanceof DynamicChunkItemReader)
? ((DynamicChunkItemReader<?>) reader).getChunkSize()
: chunkSize;
// Ensure valid chunk size
dynamicChunkSize = Math.max(1, dynamicChunkSize);
return super.doProvide(dynamicChunkSize, taskExecutor);
}
-> Now after we have configured out dynamic chunk provider and taskletStep we will configure the itemReader to dynamically build the steps needed for the process :
@Override
public T read() throws Exception {
T item = delegate.read();
if (item != null) {
// Extract key (e.g., customerId from Transaction)
String key = extractKey(item);
keyCounts.merge(key, 1L, Long::sum);
// Calculate skew and update chunk size (throttled to reduce overhead)
if (keyCounts.size() % 100 == 0) { // Update every 100 items
double variance = calculateVariance(keyCounts);
int newChunkSize = calculateChunkSize(variance);
chunkSize.set(newChunkSize);
}
}
return item;
}
private String extractKey(T item) {
if (item instanceof Transaction) {
return ((Transaction) item).getCustomerId();
}
return "unknown";
}
private double calculateVariance(Map<String, Long> counts) {
double mean = counts.values().stream().mapToLong(Long::longValue).average().orElse(0);
return counts.values().stream()
.mapToDouble(count -> Math.pow(count - mean, 2))
.average()
.orElse(0);
}
private int calculateChunkSize(double variance) {
int current = chunkSize.get();
if (variance > 1000) {
return Math.max(MIN_CHUNK_SIZE, current / 2); // Smaller chunks for high skew
} else if (variance < 100) {
return Math.min(MAX_CHUNK_SIZE, current * 2); // Larger chunks for low skew
}
return current;
}
public int getChunkSize() {
return chunkSize.get();
}
In my application, I needed to adjust the chunk size dynamically at runtime based on real-time data characteristics (e.g., data skew, where some keys have more records than others). I’ve implemented a custom ItemReader that calculates an appropriate chunk size dynamically (exposed via getChunkSize()), but I’m unable to apply this value to the step because chunk() expects a static integer. Restarting the step or job to apply a new chunk size is not feasible, as it disrupts processing and loses progress.
Rest of the things will proceed as usual , including the traditional itemReader , just include a custom chunk size check/flag , and you'll be good to go .**
Through this approach , we can set chunk size dynamically, without restarting the step.**