I am trying Kafka Transnational producer in Java.
like
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(rec, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
});
producer.commitTransaction();
}catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
producer.close();
}catch(KafkaException e) {
producer.abortTransaction();
}catch (Exception x){}
producer.close();
It is not throwing any error. And send
is also pushing message in Kafka it is available.
And the broker's logs I can see are like:
[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)
And after 5 mins I found this broker log. [2017-10-30 19:36:44,123] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
In this I can only see that the transaction is initialized
but no further logs for commit or something else is coming.
In producer configs I am appending
transactional.id=<some random transaction ID>
enable.idempotence=true
as mentioned Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.
I found one statement Producer: Send an OffsetCommitRequest to commit the input state associated with the end of that transaction
in Kafka Documentation
Does this means that I have to tell which Offset
I want to commit?
I am not sure what is happening with producer
This are my producer DEBUG logs:
1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY
I think so I am missing something before committing transaction. And in consumer I am also not able to consume if I set READ_COMMITTED
. If not than it is working normally and even I am getting messages also which I am producing using transactional producer.
My consumer code for reading transactional messages I have
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
and the consumer is subscribing to topic
topic and
my consumer debug console logs are:
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
In this it is repeating same 3 line continuously and I have 13 as a highest offset value. And in the consumer I am not able to consume messages.
I have a setup of 1 node cluster I tried on 3 node also it is showing same result.
Any help is appreciated.
Finally for me it started working.
I can't say exactly what the problem was. But from my observations it was something to due with WINDOWS OS
.
If our broker is on windows machine it is not working as expected. And if broker is on Linux machine it is working fine.
__transaction_state
in windows machineDumping 00000000000000000000.index
offset: 0 position: 0
Dumping 00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 .... transactionalId=TXN_ID:1 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510143189059,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:1 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189232,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:1 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189393,txnTimeoutMs=60000
Dumping 00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1510143189059
Index timestamp: 0, log timestamp: 1510143189059
Found out of order timestamp in :D:\tmp\kafka-logs-0\__transaction_state-9\00000000000000000000.timeindex
Index timestamp: 0, Previously indexed timestamp: 0
In the above logs you can easily see transaction state as Empty
, Ongoing
and PrepareCommit
. But Completion of commit/transaction is not mentioned.
But in my console logs for producer it is represented as transaction is completed so definitely there is some problem.
__transaction_state
in Linux machineoffset: 0 position: 0 .... transactionalId=TXN_ID:2 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510145904630,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:2 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904763,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:2 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904931,txnTimeoutMs=60000
offset: 3 position: 383 .... transactionalId=TXN_ID:2 ... ,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1510145904938,txnTimeoutMs=60000
But here we can easily find that total 4 different states are mentioned.
Empty
, Ongoing
, PrepareCommit
and CompleteCommit
. Which actually makes transaction complete.
And we can therefor re-use that commit ID also.
So, as I can conclude use Kafka brokers on Linux instead of windows for now if you want to use transactions.