google-cloud-dataflowapache-beam-ioapache-beamapache-beam-kafkaio

Apache Beam KafkaIO consumers in consumer group reading same message


I'm using KafkaIO in dataflow to read messages from one topic. I use the following code.

KafkaIO.<String, String>read()
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 8000).put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000)
                .build())
//                .commitOffsetsInFinalize()
                .withTopics(Collections.singletonList(topicNames))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

I run the dataflow program in my local using the direct runner. Everything runs fine. I run another instance of the same program in parallel i.e another consumer. Now I see duplicate messages in processing of the pipeline.

Though I have provided consumer group id, starting another consumer with same consumer group id(different instance of the same program) shouldn't be processing same elements that are processed by another consumer right?

How does this turn out using dataflow runner?


Solution

  • I don't think the options you have set guarantees non-duplicate delivery of messages across pipelines.

    See here for the protocol Beam source use to determine the starting point of the Kafka source.

    To guarantee non-duplicate delivery probably you have to read from different topics or different subscriptions.