apache-kafkaspring-batchlistenerkafka-producer-apispring-batch-tasklet

How to call StepExcecutionListener in spring batch with kafka integration?


Below is the config of job in etl.xml

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:job>

below is the code used to send messaged to the topic.

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

Once the kafkaTemplate.send(message)is executed , the listener is called and the job completes. I see the onSuccess(), onFailure() are called post the job is completed. How can I chnage the config of job so that listener is called after receiving the acknowledgement from kafka topic?


Solution

  • Would you like to some code example of what you suggested to block waiting for future. That might be of help.

    I did not try the following but here is the idea:

    ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
    try {
        SendResult<String, Message> sendResult = future.get();
        // inspect sendResult
        log.info("marking as SUCCESS");
        manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
    } catch (Exception e) {
         log.info("marking as FAILURE");
         manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
         // do something with e
    }