pythonkafka-consumer-apikafka-python

kafka-python - How do I commit a partition?


Using kafka-python-1.0.2.

If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise

From the docs, I want to use:

consumer.commit(offset=offsets)

Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.

I was hoping the function call would just be something like:

consumer.commit(partition, offset)

but this does not seem to be the case.

Thanks in advance.


Solution

  • So it looks like I may have figured it out, funny how that happens when you write down your questions. This seems to work:

    meta = consumer.partitions_for_topic(topic)
    options = {}
    options[partition] = OffsetAndMetadata(message.offset + 1, meta)
    consumer.commit(options)
    

    More testing is needed, but will update if anything changes.