I'm working on implementing a Spring Batch job that will consume messages from a RabbitMQ queue and process their validation. Each message is an array of objects, like the example below:
[JSON object }, { // JSON object } ]
My goal is to retrieve the JSON objects and pass them to the processor for validation. However, a major challenge I'm facing is that the JSON objects in the queue may be in the wrong format, with missing brackets or quotes. It is crucial for me to handle these errors properly and return the messages with errors as a response when manually running the job.
My initial idea was to implement a fault tolerant step that will use chunking, which would be executed by a ThreadPoolTaskExecutor
. I attempted to create my own multi-resource reader based on the MultiResourceItemReader
, which delegates the reading of JSON files to the JsonItemReader
. Essentially, my custom reader is similar to the one provided by Spring, but I added exception handling for cases when errors occur during delegate.read()
and delegate.open(new ExecutionContext())
, which can happen when reading JSON files with typos. If an exception occurs during reading, I write the message ID from the current resource to the job execution context. After the job execution, I can extract these message IDs from the job context.
My custom reader is decorated by SynchronizedItemStreamReader
to ensure that the reading process is synchronized. However, I suspect there might still be concurrency issues, as I consistently end up with fewer message IDs associated with errors than expected. For example, if I'm processing 4 messages with typos, each containing 1000 objects, and there are 3 failed messages in the job execution context, it's perplexing that the read operation is considered successful. If I process only the missing message together with other correct messages, I get one error message in the job context, indicating that the exception handling works in that case. However, for multiple invalid messages, I don't even see an error for the missing message in the log.
So, my main question is whether my approach is generally correct, or if I should explore an alternative solution. The key requirement is to perform resource reading and JSON object processing in parallel while capturing the message IDs for which reading failed. Here is my step code
@Bean
public Step rabbitStep() {
return new StepBuilder("rabbitValidateStep", jobRepository)
.<ProcessRequest, Void>chunk(chunkSize, transactionManager)
.reader(rabbitSynchronizedItemStreamReader())
.processor(rabbitItemProcessor())
.writer(item -> {
})
.faultTolerant()
.skip(ParseException.class)
.noRetry(ParseException.class)
.noRollback(ParseException.class)
.skip(ItemStreamException.class)
.noRetry(ItemStreamException.class)
.noRollback(ItemStreamException.class)
.skipLimit(1000)
.taskExecutor(rabbitDelegatingSecurityContextAsyncTaskExecutor(rabbitValidateThreadPoolTaskExecutor()))
.throttleLimit(getWorkingThreads())
.build();
}
So, my main question is whether my approach is generally correct, or if I should explore an alternative solution.
Spring Batch does not seem the right solution to this streaming requirement. I believe a pool of workers listening on the queue to do the validation in parallel is easier to implement and better suited for the problem than using a Spring Batch job. You could probably use Spring Batch as a library to leverage the JSON reader.