javaapache-kafkaproducer-consumertransactionalproducer

How to commit producer properly and consume transactional message in Kafka 0.11?


I am trying Kafka Transnational producer in Java.

like

    producer.initTransactions();
    try {
        producer.beginTransaction();
        producer.send(rec, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
                System.out.println("The offset of the record we just sent is: " + metadata.offset());
        });
        producer.commitTransaction();
    }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
        producer.close();
    }catch(KafkaException e) {
        producer.abortTransaction();
    }catch (Exception x){}
    producer.close();

It is not throwing any error. And send is also pushing message in Kafka it is available.

And the broker's logs I can see are like:

[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)

And after 5 mins I found this broker log. [2017-10-30 19:36:44,123] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

In this I can only see that the transaction is initialized but no further logs for commit or something else is coming.

In producer configs I am appending

transactional.id=<some random transaction ID>
enable.idempotence=true

as mentioned Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.

I found one statement Producer: Send an OffsetCommitRequest to commit the input state associated with the end of that transaction in Kafka Documentation

Does this means that I have to tell which Offset I want to commit?

I am not sure what is happening with producer

This are my producer DEBUG logs:

1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY

I think so I am missing something before committing transaction. And in consumer I am also not able to consume if I set READ_COMMITTED. If not than it is working normally and even I am getting messages also which I am producing using transactional producer.

My consumer code for reading transactional messages I have

configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

and the consumer is subscribing to topic topic and

my consumer debug console logs are:

126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)

In this it is repeating same 3 line continuously and I have 13 as a highest offset value. And in the consumer I am not able to consume messages.

I have a setup of 1 node cluster I tried on 3 node also it is showing same result.

Any help is appreciated.


Solution

  • Finally for me it started working.

    I can't say exactly what the problem was. But from my observations it was something to due with WINDOWS OS.

    If our broker is on windows machine it is not working as expected. And if broker is on Linux machine it is working fine.

    My Observations:

    Log dump segment for __transaction_state in windows machine

    Dumping 00000000000000000000.index
    offset: 0 position: 0
    Dumping 00000000000000000000.log
    Starting offset: 0
    offset: 0 position: 0 .... transactionalId=TXN_ID:1 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510143189059,txnTimeoutMs=60000
    offset: 1 position: 117 .... transactionalId=TXN_ID:1 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189232,txnTimeoutMs=60000
    offset: 2 position: 250 .... transactionalId=TXN_ID:1 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189393,txnTimeoutMs=60000
    Dumping 00000000000000000000.timeindex
    timestamp: 0 offset: 0
    Found timestamp mismatch in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
    Index timestamp: 0, log timestamp: 1510143189059
    Index timestamp: 0, log timestamp: 1510143189059
    Found out of order timestamp in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
    Index timestamp: 0, Previously indexed timestamp: 0
    

    In the above logs you can easily see transaction state as Empty, Ongoing and PrepareCommit. But Completion of commit/transaction is not mentioned.

    But in my console logs for producer it is represented as transaction is completed so definitely there is some problem.

    Log dump segment for __transaction_state in Linux machine

    offset: 0 position: 0 .... transactionalId=TXN_ID:2 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510145904630,txnTimeoutMs=60000
    offset: 1 position: 117 .... transactionalId=TXN_ID:2 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904763,txnTimeoutMs=60000
    offset: 2 position: 250 .... transactionalId=TXN_ID:2 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904931,txnTimeoutMs=60000
    offset: 3 position: 383 .... transactionalId=TXN_ID:2 ... ,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1510145904938,txnTimeoutMs=60000
    

    But here we can easily find that total 4 different states are mentioned.

    Empty, Ongoing, PrepareCommit and CompleteCommit. Which actually makes transaction complete.

    And we can therefor re-use that commit ID also.

    So, as I can conclude use Kafka brokers on Linux instead of windows for now if you want to use transactions.