spring-bootspring-batchskipfault-toleranceitemprocessor

Why is exception in Spring Batch AsycItemProcessor caught by SkipListener's onSkipInWrite method?


I'm writing a Spring Boot application that starts up, gathers and converts millions of database entries into a new streamlined JSON format, and then sends them all to a GCP PubSub topic. I'm attempting to use Spring Batch for this, but I'm running into trouble implementing fault tolerance for my process. The database is rife with data quality issues, and sometimes my conversions to JSON will fail. When failures occur, I don't want the job to immediately quit, I want it to continue processing as many records as it can and, before completion, to report which exact records failed so that I, and or my team, can examine these problematic database entries.

To achieve this, I've attempted to use Spring Batch's SkipListener interface. But I'm also using an AsyncItemProcessor and an AsyncItemWriter in my process, and even though the exceptions are occurring during the processing, the SkipListener's onSkipInWrite() method is catching them - rather than the onSkipInProcess() method. And unfortunately, the onSkipInWrite() method doesn't have access to the original database entity, so I can't store its ID in my list of problematic DB entries.

Have I misconfigured something? Is there any other way to gain access to the objects from the reader that failed the processing step of an AsynItemProcessor?

Here's what I've tried...

I have a singleton Spring Component where I store how many DB entries I've successfully processed along with up to 20 problematic database entries.

@Component
@Getter //lombok
public class ProcessStatus {
    private int processed;
    private int failureCount;
    private final List<UnexpectedFailure> unexpectedFailures = new ArrayList<>();

    public void incrementProgress { processed++; }
    public void logUnexpectedFailure(UnexpectedFailure failure) {
        failureCount++;
        unexpectedFailure.add(failure);
    }

    @Getter
    @AllArgsConstructor
    public static class UnexpectedFailure {
        private Throwable error;
        private DBProjection dbData;
    }
}

I have a Spring batch Skip Listener that's supposed to catch failures and update my status component accordingly:

@AllArgsConstructor
public class ConversionSkipListener implements SkipListener<DBProjection, Future<JsonMessage>> {
    private ProcessStatus processStatus;

    @Override
    public void onSkipInRead(Throwable error) {}

    @Override
    public void onSkipInProcess(DBProjection dbData, Throwable error) {
        processStatus.logUnexpectedFailure(new ProcessStatus.UnexpectedFailure(error, dbData));
    }

    @Override
    public void onSkipInWrite(Future<JsonMessage> messageFuture, Throwable error) {
        //This is getting called instead!! Even though the exception happened during processing :(
        //But I have no access to the original DBProjection data here, and messageFuture.get() gives me null.
    }
}

And then I've configured my job like this:

@Configuration
public class ConversionBatchJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private TaskExecutor processThreadPool;

    @Bean
    public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:100}") Integer chunkSize) {
        return new SimpleCompletionPolicy(chunkSize);
    }

    @Bean
    @StepScope
    public ItemStreamReader<DbProjection> dbReader(
            MyDomainRepository myDomainRepository,
            @Value("#{jobParameters[pageSize]}") Integer pageSize,
            @Value("#{jobParameters[limit]}") Integer limit) {
        RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
        myDomainRepositoryReader.setRepository(myDomainRepository);
        myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
        myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
            add("ACTIVE");
        }});
        myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
            put("update_date", Sort.Direction.ASC);
        }});
        myDomainRepositoryReader.setPageSize(pageSize);
        myDomainRepositoryReader.setMaxItemCount(limit);
        // myDomainRepositoryReader.setSaveState(false); <== haven't figured out what this does yet
        return myDomainRepositoryReader;
    }

    @Bean
    @StepScope
    public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalSerivice dataRetrievalService) {
        //Sometimes throws exceptions when DB data is exceptionally weird, bad, or missing
        return new DbProjectionToJsonMessageConverter(dataRetrievalService);
    }

    @Bean
    @StepScope
    public AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter(
            ItemProcessor<DbProjection, JsonMessage> dataConverter) throws Exception {
        AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter = new AsyncItemProcessor<>();
        asyncDataConverter.setDelegate(dataConverter);
        asyncDataConverter.setTaskExecutor(processThreadPool);
        asyncDataConverter.afterPropertiesSet();
        return asyncDataConverter;
    }

    @Bean
    @StepScope
    public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
        return new JsonMessageWriter(publisherService);
    }

    @Bean
    @StepScope
    public AsyncItemWriter<JsonMessage> asyncJsonPublisher(ItemWriter<JsonMessage> jsonPublisher) throws Exception {
        AsyncItemWriter<JsonMessage> asyncJsonPublisher = new AsyncItemWriter<>();
        asyncJsonPublisher.setDelegate(jsonPublisher);
        asyncJsonPublisher.afterPropertiesSet();
        return asyncJsonPublisher;
    }

    @Bean
    public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
                                  ItemStreamReader<DbProjection> dbReader,
                                  AsyncItemProcessor<DbProjection, JsonMessage> asyncDataConverter,
                                  AsyncItemWriter<JsonMessage> asyncJsonPublisher,
                                  ProcessStatus processStatus,
                                  @Value("${conversion.failure.limit:20}") int maximumFailures) {
        return stepBuilderFactory.get("conversionProcess")
                .<DbProjection, Future<JsonMessage>>chunk(processChunkSize)
                .reader(dbReader)
                .processor(asyncDataConverter)
                .writer(asyncJsonPublisher)
                .faultTolerant()
                .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
                            //  ^ for now this returns true for everything until 20 failures
                    .listener(new ConversionSkipListener(processStatus))
                .build();
    }

    @Bean
    public Job conversionJob(Step conversionProcess) {
        return jobBuilderFactory.get("conversionJob")
                .start(conversionProcess)
                .build();
    }
}

Solution

  • This is because the future wrapped by the AsyncItemProcessor is only unwrapped in the AsyncItemWriter, so any exception that might occur at that time is seen as a write exception instead of a processing exception. That's why onSkipInWrite is called instead of onSkipInProcess.

    This is actually a known limitation of this pattern which is documented in the Javadoc of the AsyncItemProcessor, here is an excerpt:

    Because the Future is typically unwrapped in the ItemWriter,
    there are lifecycle and stats limitations (since the framework doesn't know 
    what the result of the processor is).
    
    While not an exhaustive list, things like StepExecution.filterCount will not
    reflect the number of filtered items and 
    itemProcessListener.onProcessError(Object, Exception) will not be called.
    

    The Javadoc states that the list is not exhaustive, and the side-effect regarding the SkipListener that you are experiencing is one these limitations.