I'm using python 3.9.16 and kafka-python version 2.0.2. I'm running on my Macbook Pro IOS 11.6.5.
I'm still getting my feet wet with Kafka so it's entirely possible I'm doing things the wrong way.
What I'm trying to do is test seeking to offsets with my consumer in case something doesn't get processed and I have to go back and re-read a message.
Anyway, I keep running into this error message. I'm not even sure why it's happening because sometimes I can process the offset and it works fine, other times, it gives me this message:
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10bb669f0>', hit error: unpack requires a buffer of 4 bytes
When it's working, I can see this in pdb, which kinda proves that the values are present in the topic for me to consume:
(Pdb)
> /Users/username/kafka/tkCons.py(41)<module>()
-> print ("{}, {}".format(blah.offset, blah.value))
(Pdb)
10, b'{"number": 10}'
> /Users/username/kafka/tkCons.py(40)<module>()
-> for blah in consumer:
(Pdb)
I wish I could narrow down what I'm doing during testing but I can't pin down what lines of code I added/commented out helps make it work or makes it give me the above error. Since I'm not 100% sure what's happening under the hood, is me seeking around somehow affecting something in zookeeper? What do I need to do to make whatever under the hood stuff happy? Here's my code in case it matters.
from kafka import KafkaConsumer, TopicPartition
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(#'my-topic333', 'my-topic222', 'my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
myTP = TopicPartition('my-topic333', 0)
import pdb
pdb.set_trace()
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
print ("before this is my position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 50)
#consumer.seek_to_beginning()
print ("after seeking this is my position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))
Firstly, blah.value
could be None
, but it should print None, then, not raise a ValueError related to deserialization... You need to show your full stacktrace and print the offset on a separate line from the value so that you can see where the error is actually happening, or look at any logs that include the previous successful offset...
in case something doesn't get processed and I have to go back and re-read a message
I'd not suggest using seek for this.
Instead, make a failed processor raise a fatal exception, and stop your Python process. Then, handle offset commits manually for any successfully processed data (or batch of offsets you're willing to tolerate duplicates for, assuming you're processing them in an idempotent way), meaning also disable auto offset commits. Then, when you do this, and restart the consumer group, the app will automatically pick back up after the last successfully processed offset, and no manual seeking is required.
is me seeking around somehow affecting something in zookeeper?
Assign API does not use consumer groups, so not unless you also use subscribe and commit functions.