How to correctly catch exception when sending to Kafka via KafkaIO?
KafkaIO.<String, String>write()
.withBootstrapServers(kafkaBroker)
.withTopic(topic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
.withProducerConfigUpdates(kafkaProperties)
I have TupleCollection with Failure objects for storing exceptions, but I need to do retry sending message if it wasn't sent(or write this message to any other source - db, file, etc). I just need to catch error, but how can I do it with KafkaIO.<String, String>write()?
Currently, there is no way to capture the exceptions thrown when writing to Kafka via KafkaIO in Beam. I am currently working on the design to support this functionality, however it will not be available in the short term.
The Dataflow runner will automatically retry the failed message (4 times in batch mode, unlimited times in streaming). (https://cloud.google.com/dataflow/docs/guides/common-errors)