I'm using KafkaIO in dataflow to read messages from one topic. I use the following code.
KafkaIO.<String, String>read()
.withReadCommitted()
.withBootstrapServers(endPoint)
.withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
.build())
// .commitOffsetsInFinalize()
.withTopics(Collections.singletonList(topicNames))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata();
I run the dataflow program in my local using the direct runner. Everything runs fine. I run another instance of the same program in parallel i.e another consumer. Now I see duplicate messages in processing of the pipeline.
Though I have provided consumer group id, starting another consumer with same consumer group id(different instance of the same program) shouldn't be processing same elements that are processed by another consumer right?
How does this turn out using dataflow runner?
I don't think the options you have set guarantees non-duplicate delivery of messages across pipelines.
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG: This is a flag for the Kafka consumer not for Beam pipeline itself. Seems like this is best effort and periodic so you might still see duplicates across multiple pipelines.
withReadCommitted(): This just means that Beam will not read uncommitted messages. Again, it will not prevent duplicates across multiple pipelines.
See here for the protocol Beam source use to determine the starting point of the Kafka source.
To guarantee non-duplicate delivery probably you have to read from different topics or different subscriptions.