I'm using KafkaIO unbounded source in a Apache Beam pipeline running on DataFlow. Following configuration works for me
Map<String, Object> kafkaConsumerConfig = new HashMap<String, Object>() {{
put("auto.offset.reset", "earliest");
put("group.id", "my.group.id");
}};
p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(10)
.withoutMetadata())
// do something
Now as I have a protobuf definition for the messages in my topic I would like to use it to convert the kafka records in Java objects.
Following configuration doesn't work and requires a Coder:
p.apply(KafkaIO.<String, Bytes>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(BytesDeserializer.class)
.withMaxNumRecords(10)
.withoutMetadata())
Unfortunately, I cannot find out what is the right Value Deserializer + Coder combination and cannot find similar examples in the documentation. Do you have any working examples for using Protobuf with Kafka source in Apache Beam?
If you want to deserialise protobuf messages, you need a custom class which implements the Deserializer
interface from the apache kafka library. This deserialisation is the same for other models too, not only apache beam.
To get the apache kafka library you can add org.apache.kafka:kafka-clients
as a dependency.
Then create the custom deserializer class, which would look something like:
public class ProtoDeserializer implements Deserializer<YourProtoClass> {
@Override
public YourProtoClass deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return YourProtoClass.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Then use that class in you pipeline:
p.apply(KafkaIO.<String, Bytes>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ProtoDeserializer.class)
.withMaxNumRecords(10)
.withoutMetadata())