spring-batchspring-integrationspring-kafkaspring-batch-integration

Spring Batch remote-partitioning worker-job not reading data from middleware


I wanted to checkout the remote partitioning setup in Spring Batch. I am using Spring Boot v2.7.2 & Kafka as middleware. I have used the ColumnRangePartitioner from the Spring Batch samples

I am seeing that the manager job is persisting the metadata in the Kafka Topic and partition values are persisted in the Spring Batch tables for the worker job.

Below are the questions:

  1. My worker job is not able to read the data from the Kafka Topic and it gets null values in the partition range injected in the JDBCPagingItemReader bean created in the worker job.

  2. What is the expected behavior if I run the worker job without running the master job first? i.e. there is no data for worker job to process in Kafka Topic and in the Spring Batch tables. My worker job creates a new job instance and executes it (does not wait for the manager job to send invocation metadata)

Below is the Partitioner configuration:

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
    Integer min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from ( " + table + ") as aliastab ", Integer.class);
    Integer max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from ( " + table + ") as aliastab ", Integer.class);
    Integer targetSize = (max - min) / gridSize + 1;

    Map<String, ExecutionContext> result = new HashMap<>();
    Integer number = 0;
    Integer start = min;
    Integer end = start + targetSize - 1;

    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);

        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        start += targetSize;
        end += targetSize;
        number++;
    }

    return result;
}

Manager Config:

@Configuration
@Profile("manager")
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-manager.properties")
public class ManagerBeanConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServers;
    
    @Value("${kafka.request.topic}")
    private String requestTopicName;
    
    @Value("${kafka.response.topic}")
    private String responseTopicName;
        
    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroup;
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Bean
    public RemotePartitioningManagerStepBuilderFactory remotePartitioningManagerStepBuilderFactory(
            JobRepository jobRepository, JobExplorer jobExplorer,
            DataSourceTransactionManager springBatchDataSourceTransactionManager) {
        RemotePartitioningManagerStepBuilderFactory remotePartitioningManagerStepBuilderFactory = new RemotePartitioningManagerStepBuilderFactory(
                jobRepository, jobExplorer, springBatchDataSourceTransactionManager);
        return remotePartitioningManagerStepBuilderFactory;
    }

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel inboundReplies() {
        return new DirectChannel();
    }
    
    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootStrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory(props);
    }

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory, DirectChannel inboundReplies) {
        ContainerProperties containerProps = new ContainerProperties(new String[] {responseTopicName});
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, containerProps))
                .channel(inboundReplies)
                .get();
    }

    @Bean
    public IntegrationFlow outboundFlow(DirectChannel outboundRequests) {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(ManagerConstants.REQUEST_TOPIC_NAME));

        Function<Message<?>, Long> partitionFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % 3;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionFn));

        return IntegrationFlows.from(outboundRequests).handle(messageHandler).get();
    }
    
    @Bean
    public Job addressManagerJob(Step immsAddressManagerPartitionerStep) {
        return jobBuilderFactory.get("addressManagerJob").start(immsAddressManagerPartitionerStep)
                .incrementer(new RunIdIncrementer()).build();
    }

    @Bean
    public ColumnRangePartitioner partitioner(@Qualifier("fcrmDataSource") DataSource fcrmDataSource) {
        ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
        partitioner.setColumn("addr_id");
        partitioner.setDataSource(fcrmDataSource);
        partitioner.setTable("SELECT addr_id, lookup_Code, meaning from addr ");
        return partitioner;
    }

    @Bean
    public Step addressManagerPartitionerStep(RemotePartitioningManagerStepBuilderFactory remotePartitioningManagerStepBuilderFactory,
            DirectChannel outboundRequests, DirectChannel inboundReplies, ColumnRangePartitioner partitioner) {
        return remotePartitioningManagerStepBuilderFactory.get("addressManagerPartitionerStep")
                .gridSize(3)
                .partitioner("addressWorkerStep", partitioner)
                .outputChannel(outboundRequests)
                .inputChannel(inboundReplies)
                .build();
    }

}

Worker Job Config:

@Configuration
@Profile("worker")
@EnableBatchProcessing
@EnableBatchIntegration
@PropertySource("classpath:application-worker.properties")
public class WorkerBeanConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServers;
    
    @Value("${kafka.request.topic}")
    private String requestTopicName;
    
    @Value("${kafka.response.topic}")
    private String responseTopicName;
        
    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroup;
    
    @SuppressWarnings({ "rawtypes", "unused" })
    @Autowired
    private KafkaTemplate kafkaTemplate; 
    
    @Bean
    public RemotePartitioningWorkerStepBuilderFactory remotePartitioningWorkerStepBuilderFactory(JobRepository jobRepository, JobExplorer jobExplorer,
            DataSourceTransactionManager springBatchDataSourceTransactionManager) {
        RemotePartitioningWorkerStepBuilderFactory remotePartitioningWorkerStepBuilderFactory = new RemotePartitioningWorkerStepBuilderFactory(jobRepository, jobExplorer,
                springBatchDataSourceTransactionManager);
        return remotePartitioningWorkerStepBuilderFactory;
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<AddressType> reader(
            @Qualifier("fcrmDataSource") DataSource fcrmDataSource, @Value("#{stepExecutionContext[minValue]}") final Integer minVal, @Value("#{stepExecutionContext[maxValue]}") final Integer maxVal) {
        Map<String, Order> sortMap = new HashMap<>();
        sortMap.put("addr_id", Order.ASCENDING);

        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("minValue", minVal);
        paramMap.put("maxValue", maxVal);

        PostgresPagingQueryProvider queryProvider = new  PostgresPagingQueryProvider();
        queryProvider.setSelectClause(" SELECT addr_id, lookup_Code, meaning  ");
        queryProvider.setFromClause(" from addr ");
        queryProvider.setWhereClause(" addr_id >= :minValue and addr_id <= :maxValue ");
        queryProvider.setSortKeys(sortMap);

        JdbcPagingItemReader<AddressType> reader = new JdbcPagingItemReader<AddressType>();
        reader.setDataSource(fcrmDataSource);
        reader.setFetchSize(100); // hint to db driver
        reader.setPageSize(1); //actual rows read   
        reader.setRowMapper(new AddressTypeRowMapper());
        reader.setParameterValues(paramMap);
        reader.setQueryProvider(queryProvider);
        return reader;
    }

    @Bean
    public AddressTypeProcessor addressTypeProcessor() {
        return new AddressTypeProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<AddressRelationType> writer(
            @Qualifier("fcrmDataSource") DataSource fcrmDataSource) {
        return new JdbcBatchItemWriterBuilder<AddressRelationType>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<AddressRelationType>())
                .sql("INSERT INTO public.address_relation_type( "
                        + " address_relation_type_cd, tenant_cd, address_relation_type_desc, "
                        + " batch_id, short_name, definition, address_type_dv, custom_boolean_01, "
                        + " custom_medium_string_01, custom_small_string_01, custom_small_string_02, "
                        + " custom_small_string_03, entity_sk)\r\n"
                        + " VALUES (:addressRelationTypeCode, 'N/A', :addressRelationTypeDesc, "
                        + " :batchId, :shortName, :definition, :addressTypeDv, :customBoolean01, "
                        + " :customMediumString01, :customSmallString01, :customSmallString02, "
                        + " :customSmallString03, :entitySk)")
                .dataSource(fcrmDataSource).build();
    }
    
    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }
    
    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootStrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory(props);
    }
    
    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory consumerFactory, QueueChannel inboundRequests) {
        return IntegrationFlows
                .from(Kafka.inboundChannelAdapter(consumerFactory, new ConsumerProperties(WorkerConstants.REQUEST_TOPIC_NAME)))
                .channel(inboundRequests)
                .get();
    }
 
    @Bean
    public QueueChannel outboundReplies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(KafkaTemplate kafkaTemplate, QueueChannel outboundReplies) {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(responseTopicName));

        return IntegrationFlows
                .from(outboundReplies)
                .handle(messageHandler)
                .get();
    }    

    @Bean
    public Job addressWorkerJob(Step immsAddressWorkerStep, JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("addressWorkerJob")
                .incrementer(new RunIdIncrementer())
                .flow(immsAddressWorkerStep)
                .end()
                .build();
    }
        
    @Bean
    public Step addressWorkerStep(RemotePartitioningWorkerStepBuilderFactory remotePartitioningWorkerStepBuilderFactory, JdbcPagingItemReader<AddressType> reader,
            AddressTypeProcessor addressTypeProcessor, JdbcBatchItemWriter<AddressRelationType> writer, QueueChannel outboundReplies, QueueChannel inboundRequests) {
        return remotePartitioningWorkerStepBuilderFactory.get("addressWorkerStep")
                .outputChannel(outboundReplies)
                .inputChannel(inboundRequests)
                .<AddressType, AddressRelationType>chunk(1)
                .reader(reader)
                .processor(addressTypeProcessor)
                .writer(writer)
                .build();
    }
    
}

Update: For Mahmoud's comment "First thing to check: is partitioning metadata (column ranges) correctly persisted in the database?"

My source table has 7 rows. I see that the partitions are created correctly. Please refer to the screenshot below. Step_Execution_id (694, 695, 696, 697) were created by the manager job. Step_Execution_id = 698 was created by the worker job.

Data from Batch tables

Below is a screenshot from the Kafka request topic.

enter image description here

So we see that both DB & Kafka have right metadata. The stepExecutionIDs in the image do not match. Please ignore it. The older messages in the topic were drained when I took the screenshots

Update: Added the exception from the manager job startup

    2022-11-10 18:50:19.021  WARN 20120 --- [           main] o.s.i.config.ReleaseStrategyFactoryBean  : No ReleaseStrategy annotated method found on MessageChannelPartitionHandler; falling back to SimpleSequenceSizeReleaseStrategy, target: org.springframework.batch.integration.partition.MessageChannelPartitionHandler@15214920, methodName: null
2022-11-10 18:50:19.037 DEBUG 20120 --- [           main] o.s.i.h.s.MessagingMethodInvokerHelper   : Method [public java.util.Collection org.springframework.batch.integration.partition.MessageChannelPartitionHandler.handle(org.springframework.batch.core.partition.StepExecutionSplitter,org.springframework.batch.core.StepExecution) throws java.lang.Exception] is not eligible for Message handling.

java.lang.IllegalArgumentException: org.springframework.integration.handler.support.MessagingMethodInvokerHelper$IneligibleMethodException: Found more than one parameter type candidate: [org.springframework.batch.core.partition.StepExecutionSplitter] and [org.springframework.batch.core.StepExecution].
Consider annotating one of the parameters with '@Payload'.
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.createHandlerMethod(MessagingMethodInvokerHelper.java:397) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.obtainHandlerMethodIfAny(MessagingMethodInvokerHelper.java:801) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at 

Update: Recreated the Manager & Worker batch jobs. Created the Mixin types as below but getting the infinite recursion due to Jackson

public abstract class JobExecutionMixin {
    @JsonManagedReference
    private Collection<StepExecution> stepExecutions;
}


public abstract class StepExecutionMixin {
    @JsonManagedReference
    private Collection<StepExecution> stepExecutions;
}

@Bean
public JobExplorer jobExplorer(DataSource dataSource, JdbcOperations jdbcOperations) throws Exception {
    ObjectMapper mapper = new ObjectMapper();
    mapper.addMixIn(org.springframework.batch.core.StepExecution.class, StepExecutionMixin.class);
    mapper.addMixIn(org.springframework.batch.core.JobExecution.class, JobExecutionMixin.class);
    
    Jackson2ExecutionContextStringSerializer jackson2ExecutionContextStringSerializer = new Jackson2ExecutionContextStringSerializer();
    jackson2ExecutionContextStringSerializer.setObjectMapper(mapper);
    
    JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
    jobExplorerFactoryBean.setDataSource(dataSource);
    jobExplorerFactoryBean.setJdbcOperations(jdbcOperations);
    jobExplorerFactoryBean.setSerializer(jackson2ExecutionContextStringSerializer);
    return jobExplorerFactoryBean.getObject();
} 

Below is the stack trace:

        2022-11-16 17:22:34.812  INFO 1447 --- [   scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
    minValue exists? true
    maxValue exists? true
    size of entry set 3
    key: minValue; value: 6
    key: @class; value: java.util.HashMap
    key: maxValue; value: 10
    2022-11-16 17:22:35.029  INFO 1447 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [worker_step:partition1] executed in 138ms
org.springframework.kafka.support.serializer.JsonSerializer
    
    2022-11-16 17:22:41.082  INFO 1447 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
    2022-11-16 17:22:41.168  INFO 1447 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
    2022-11-16 17:22:41.169  INFO 1447 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
    2022-11-16 17:22:41.169  INFO 1447 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1668599561168
    2022-11-16 17:22:41.180  INFO 1447 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: y8UoN-ELRlSN2xSqU0m-cA
    2022-11-16 17:22:41.191  INFO 1447 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 3 with epoch 0
    2022-11-16 17:22:41.213  INFO 1447 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition reply_topic-0 to 0 since the associated topicId changed from null to yG01ZCsETiSbnu3SqUFKRg
    2022-11-16 17:22:42.205 ERROR 1447 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'outboundFlow.kafka:outbound-channel-adapter#0' for component 'outboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/spring/etl/worker/config/WorkerJobBeanConfig.class]'; from source: 'bean method outboundFlow']; nested exception is org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=211, version=3, name=worker_step:partition1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=] for topic [reply_topic], failedMessage=GenericMessage [payload=StepExecution: id=211, version=3, name=worker_step:partition1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=0, sequenceSize=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=request_topic, kafka_offset=0, acknowledgmentCallback=org.springframework.integration.kafka.inbound.KafkaMessageSource$KafkaAckCallback@50421a42, kafka_remainingRecords=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@68612476, correlationId=94:worker_step, id=3c7d2b01-0275-6707-8524-7ecd64d255a4, kafka_receivedPartitionId=1, kafka_receivedTimestamp=1668599551751, kafka_acknowledgment=org.springframework.integration.kafka.inbound.KafkaMessageSource$KafkaAckCallback@50421a42, timestamp=1668599555039}]
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
        at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$624.0000000000000000.call(Unknown Source)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$626.0000000000000000.run(Unknown Source)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$627.0000000000000000.run(Unknown Source)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$625.0000000000000000.run(Unknown Source)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:831)
    Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=211, version=3, name=worker_step:partition1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=] for topic [reply_topic]
        at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:216)
        at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:203)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:954)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
        ... 21 more
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]-

Solution

  • With help of little googling, I needed to write a custom serialization for the worker job and a custom deserializer for the manager job.

    Used the Stackoverflow question: Problem Serializing Spring batch Kafka ChunkRequest