apache-kafkaquarkusidempotentsmallrye-reactive-messaging

Quarkus Kafka Producer Idempotence


I have two microservices

The DB-Interface just makes all the calls to the database and fetches message events and then bundles them up to the appropriate object for consumption called Payload then publishes it to the kafka topic 'message-queue'.

Then the Message-Interface sends out the message and then publishes back the response signifing either the message was delivered or not to a topic called 'status-queue'.

Now status-queue is being consumed by the Query-interface to then update the records in the DB.

My Issue is; I have configured or at least tried to configure my producer so that it maintains idempodence to avoid sending duplicates but I still seem to be missing something.

These are my configurations so far.

DB-Interface

kafka.bootstrap.servers=http://HOST_IP:9094
mp.messaging.outgoing.message-out.topic=message-queue
mp.messaging.outgoing.message-out.value.serializer=com.etzgh.QueryService.util.kafka.MessageSerializer
mp.messaging.outgoing.message-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.message-out.enable.idempotence=true
mp.messaging.outgoing.message-out.acks=all



mp.messaging.incoming.update-in.topic=status-queue
mp.messaging.incoming.update-in.auto.offset.reset=earliest
mp.messaging.incoming.update-in.value.deserializer=com.etzgh.QueryService.util.kafka.MessageStatusDeserializer
mp.messaging.incoming.update-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Message-interface

kafka.bootstrap.servers=http://HOST_:9094
mp.messaging.outgoing.update-out.topic=status-queue
mp.messaging.outgoing.update-out.value.serializer=com.etzgh.MessageService.util.kafka.MessageStatusSerializer
mp.messaging.outgoing.update-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer


mp.messaging.incoming.message-in.topic=message-queue
mp.messaging.incoming.message-in.auto.offset.reset=earliest
mp.messaging.incoming.message-in.value.deserializer=com.etzgh.MessageService.util.kafka.MessageDeserializer
mp.messaging.incoming.message-in.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Then this is my implementation of publishing the message to the message-queue

@Inject
    @Channel("message-out")
    Emitter<Record<String, Message>> emitter;

    public void publishToMessageQueue(Message message){
        String key = String.format("%s-%s",message.messageId(), message.triesCounter());
        Record<String, Message> payload = Record.of(key, message);
        emitter.send(payload);
    }

This is how I consume it at in message-interface and return the response on the other topic.

 @Incoming("message-in")
 @Outgoing("update-status")
 public Uni<MessageStatus> sink(Message message) {
    LOGGER.info(message);
    String[] array = {"P", "N", "Y"};
    Random random = new Random();
    int r = random.nextInt(3) ;

    return Uni.createFrom().item(new MessageStatus(message.messageId(), array[r]));
}

Could someone please help me with the idempodence because I can still send duplicate messages to the message-queue even though I am using a 'key' which will always be unique. Any help will be much appreciated.


Solution

  • You may need to add "transaction.id" in the config section of outgoing as it stages below :

    Kafka transactional producers require configuring acks=all client property, and a unique id for transactional.id, which implies enable.idempotence=true. When Quarkus detects the use of KafkaTransactions for an outgoing channel it configures these properties on the channel, providing a default value of "${quarkus.application.name}-${channelName}" for transactional.id property.