apache-kafkaapache-beamapache-beam-ioapache-beam-kafkaio

Apache Beam ReadFromKafka vs KafkaConsume


I'm working with a simple Apache Beam pipeline consisting of reading from an unbounded Kafka topic and printing the values out. I have two flavors of this. This is done via the Flink Runner.

Version 1

  with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic" >> ReadFromKafka(
                    consumer_config=consumer_config,
                    topics=[producer_topic])
         | 'log' >> beam.ParDo(LogData())

This one uses from apache_beam.io.kafka import ReadFromKafka (i.e. the default implementation that comes with Apache Beam).

Version 2

   with beam.Pipeline(options=beam_options) as p:
        (p
         | "Read from Kafka topic (KafkaConsumer)" >> KafkaConsume(
                    consumer_config={
                        "topic": producer_topic,
                        'auto_offset_reset': 'earliest',
                        "group_id": 'transaction_classification',
                        "bootstrap_servers": servers,
                    })

This one is using Beam nuggets:

from beam_nuggets.io.kafkaio import KafkaConsume

I have configured the Kafka producer to produce an element every 1 second.

What I've observed is that when I consume from ReadFromKafka (version 1), the elements get produced around 4-6 seconds apart, and are batched together.

On the other hand, if I tried the same thing with KafkaConsume (version 2), then I get elements as they are produced (i.e. every second), which is exactly the behavior I expected.

I have tried to make the consumer_config to be the same for both, but it doesn't seem to have any effect on version 1.

Now, I would like to stick to version 1 because that gives me proper metrics in the Flink UI, while version 2 works better, I don't get any metrics in Flink (everything is reported as 0 bytes received / 0 records received).

Does anyone have any ideas?


Solution

  • The latter uses a native Python kafka library; seeing as Beam probably requires the Kafka Client JMX Mbeans to be used for metric exposure, that would explain why they're zero when using a non JVM client

    https://github.com/mohaseeb/beam-nuggets/blob/master/beam_nuggets/io/kafkaio.py#L4

    While the former is a wrapper around Java code, and thus may explain the difference; the consumer poll returns a Java Iterator object rather than individual records directly via a native generator, as with the Python client https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py#L103