kafka-python

How to add a failure callback for kafka-python kafka.KafkaProducer#send()?


I would like to set a callback to be fired if a produced records fail. Initially, I would just like to log the failed record.

The Confluent Kafka python library provides a mechanism for adding a callback:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

How can I achieve similar behaviour with kafka-python kafka.KafkaProducer#send() without having to use the deprecated SimpleClient using kafka.SimpleClient#send_produce_request()


Solution

  • Although it isn't documented, this is relatively straightforward. Whenever you send a message, you immediately get a Future back. You can append callbacks/errback's to that Future:

    F = producer.send(topic=topic, value=message, key=key)
    F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
    F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)
    

    Relevant source code here: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

    We really should document this, I filed https://github.com/dpkp/kafka-python/issues/1256 to track it.