I'm running confluent_kafka with 2.0.2 and test the manual offset commit code. I set up a confluentinc/cp-kafka:7.3.0
with a test-topic
and 1 partition.
I wrote a test script to test manual offset commmit with 'enable.auto.offset.store': False
and 'enable.auto.commit': False
, but the consumer seems to commit the offset after every poll()
.
The test code is as follows:
from confluent_kafka import Consumer, TopicPartition
LOCAL_TEST_TOPIC = "test-topic2"
LOCAL_TEST_PRODUCER_CONFIG = {
'bootstrap.servers': '0.0.0.0:9092',
'group.id': "tail-grouptest",
'enable.auto.offset.store': False,
'enable.auto.commit': False,
}
consumer = Consumer(LOCAL_TEST_PRODUCER_CONFIG)
topics = [LOCAL_TEST_TOPIC, ]
def msg_process(msg):
if not msg:
print("EMPTY MESSAGE")
return
print(msg.value().decode('utf-8'))
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
topic = consumer.list_topics(topic='test-topic2')
partitions = [TopicPartition('test-topic2', partition)
for partition in list(topic.topics['test-topic2'].partitions.keys())]
# msg1
msg1 = consumer.poll()
msg_process(msg1)
consumer.store_offsets(message=msg1)
consumer.commit(asynchronous=False)
print(consumer.position(partitions))
# msg2
msg2 = consumer.poll()
msg_process(msg2)
consumer.store_offsets(message=msg2)
consumer.commit(asynchronous=False)
print(consumer.position(partitions))
# msg3
msg3 = consumer.poll()
msg_process(msg3)
# msg4
msg4 = consumer.poll()
msg_process(msg4)
# msg5
msg5 = consumer.poll()
msg_process(msg5)
print(consumer.position(partitions))
# msg6
msg6 = consumer.poll()
msg_process(msg6)
print(consumer.position(partitions))
print("set back to msg3")
# back to msg3
consumer.store_offsets(message=msg3)
print("commit again")
consumer.commit(message=msg3, asynchronous=False)
print(consumer.position(partitions))
msg_new = consumer.poll()
msg_process(msg_new)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def main():
basic_consume_loop(consumer, topics)
if __name__ == '__main__':
main()
If I run console producer with message
1
2
3
4
5
6
7
The script gives the following result:
1
[TopicPartition{topic=test-topic2,partition=0,offset=1,error=None}]
2
[TopicPartition{topic=test-topic2,partition=0,offset=2,error=None}]
3
4
5
[TopicPartition{topic=test-topic2,partition=0,offset=5,error=None}]
6
[TopicPartition{topic=test-topic2,partition=0,offset=6,error=None}]
set back to msg3
commit again
[TopicPartition{topic=test-topic2,partition=0,offset=6,error=None}]
7
I expect msg3 - msg6 are the same message(msg3), but it seems that the offset keeps adding up and the consumer continues to poll new messages. How did that happen? Anything wrong in my code?
Committing only stores the offset in the __consumer_offsets
topic.
You are still required to seek
if you want to re-consume old messages.