javaspringspring-boot

How to apply a dynamic chunk size from ItemReader in Spring Batch without restarting the step?


I'm building a Spring Batch job using Spring Boot 3.3 to process a large dataset of customer transactions (e.g., ~10M records). The dataset has varying density (e.g., some customers have thousands of transactions, others have few), and I want to adjust the chunk size dynamically at runtime to optimize performance.

I've tried to create a custom ItemReader that calculates a dynamic chunk size based on the data it reads (simplified here for brevity).

@Component
public class DynamicChunkItemReader implements ItemReader<Transaction> {
    private final ItemReader<Transaction> delegate;
    private volatile int chunkSize = 1000; // Initial chunk size
    private final Map<String, Long> keyCounts = new ConcurrentHashMap<>();

    public DynamicChunkItemReader(ItemReader<Transaction> delegate) {
        this.delegate = delegate;
    }

    @Override
    public Transaction read() throws Exception {
        Transaction item = delegate.read();
        if (item != null) {
            // Track data skew (e.g., transactions per customer)
            String customerId = item.getCustomerId();
            keyCounts.merge(customerId, 1L, Long::sum);

            // Simplified skew calculation
            double variance = calculateVariance(keyCounts);
            chunkSize = calculateChunkSize(variance);
        }
        return item;
    }

    private double calculateVariance(Map<String, Long> counts) {
        double mean = counts.values().stream().mapToLong(Long::longValue).average().orElse(0);
        return counts.values().stream()
                .mapToDouble(count -> Math.pow(count - mean, 2))
                .average()
                .orElse(0);
    }

    private int calculateChunkSize(double variance) {
        if (variance > 1000) return Math.max(100, chunkSize / 2); // Smaller for dense data
        if (variance < 100) return Math.min(5000, chunkSize * 2); // Larger for sparse data
        return chunkSize;
    }

    public int getChunkSize() {
        return chunkSize;
    }
}

Step configuration:

@Bean
public Step dynamicChunkStep(DynamicChunkItemReader reader, ItemWriter<Transaction> writer, PlatformTransactionManager txManager) {
    return stepBuilderFactory.get("dynamicChunkStep")
            .<Transaction, Transaction>chunk(reader.getChunkSize()) // Issue: Static value
            .reader(reader)
            .writer(writer)
            .transactionManager(txManager)
            .build();
}

The problem is that reader.getChunkSize() is called once during step configuration, and subsequent changes to chunkSize in the reader are ignored. I need the step to use the updated chunkSize value for each chunk during execution.

How can I configure a Spring Batch step to apply a dynamic chunk size from ItemReader.getChunkSize() at runtime without restarting the step?

Is there a way to customize Spring Batch’s ChunkProvider or TaskletStep to use a dynamic chunk size while preserving transaction integrity?


Solution

  • to address the issue of applying a dynamic chunk size from an ItemReader’s getChunkSize() method in Spring Batch at runtime without restarting the step, I’ll provide a detailed solution. The core challenge is that Spring Batch’s StepBuilder.chunk(int size) method sets a fixed chunk size at configuration time, but we need the step to use a dynamically updated chunk size determined by the ItemReader during execution.
    Below are the implementation procedures that depict the solution in the form of code :

    -> integrates the DynamicChunkProvider and bypasses the static chunk() size.

    public class DynamicTaskletStep extends TaskletStep {
        public DynamicTaskletStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                                  ItemReader<?> reader, ItemProcessor<?, ?> processor, ItemWriter<?> writer) {
            super(jobRepository, transactionManager);
            setChunkProvider(new DynamicChunkProvider(reader, processor, writer));
            setRepeatTemplate(new RepeatTemplate());
        }
    }
    

    ->custom provider overrides the default chunk-building logic to fetch the chunk size from the ItemReader dynamically.

    public class DynamicChunkProvider extends SimpleChunkProvider {
        private final ItemReader<?> reader;
    
        public DynamicChunkProvider(ItemReader<?> reader, ItemProcessor<?, ?> processor, ItemWriter<?> writer) {
            super(processor, writer);
            this.reader = reader;
        }
    
    @Override
        protected Chunk doProvide(int chunkSize, TaskExecutor taskExecutor) throws Exception {
            // Fetch dynamic chunk size from reader
            int dynamicChunkSize = (reader instanceof DynamicChunkItemReader)
                    ? ((DynamicChunkItemReader<?>) reader).getChunkSize()
                    : chunkSize;
            // Ensure valid chunk size
            dynamicChunkSize = Math.max(1, dynamicChunkSize);
            return super.doProvide(dynamicChunkSize, taskExecutor);
        }
    

    -> Now after we have configured out dynamic chunk provider and taskletStep we will configure the itemReader to dynamically build the steps needed for the process :

    @Override
        public T read() throws Exception {
            T item = delegate.read();
            if (item != null) {
                // Extract key (e.g., customerId from Transaction)
                String key = extractKey(item);
                keyCounts.merge(key, 1L, Long::sum);
    
                // Calculate skew and update chunk size (throttled to reduce overhead)
                if (keyCounts.size() % 100 == 0) { // Update every 100 items
                    double variance = calculateVariance(keyCounts);
                    int newChunkSize = calculateChunkSize(variance);
                    chunkSize.set(newChunkSize);
                }
            }
            return item;
        }
    
        private String extractKey(T item) {
            if (item instanceof Transaction) {
                return ((Transaction) item).getCustomerId();
            }
            return "unknown";
        }
    
        private double calculateVariance(Map<String, Long> counts) {
            double mean = counts.values().stream().mapToLong(Long::longValue).average().orElse(0);
            return counts.values().stream()
                    .mapToDouble(count -> Math.pow(count - mean, 2))
                    .average()
                    .orElse(0);
        }
    
        private int calculateChunkSize(double variance) {
            int current = chunkSize.get();
            if (variance > 1000) {
                return Math.max(MIN_CHUNK_SIZE, current / 2); // Smaller chunks for high skew
            } else if (variance < 100) {
                return Math.min(MAX_CHUNK_SIZE, current * 2); // Larger chunks for low skew
            }
            return current;
        }
    
        public int getChunkSize() {
            return chunkSize.get();
        }
    

    In my application, I needed to adjust the chunk size dynamically at runtime based on real-time data characteristics (e.g., data skew, where some keys have more records than others). I’ve implemented a custom ItemReader that calculates an appropriate chunk size dynamically (exposed via getChunkSize()), but I’m unable to apply this value to the step because chunk() expects a static integer. Restarting the step or job to apply a new chunk size is not feasible, as it disrupts processing and loses progress.
    Rest of the things will proceed as usual , including the traditional itemReader , just include a custom chunk size check/flag , and you'll be good to go .**
    Through this approach , we can set chunk size dynamically, without restarting the step.**