pythonapache-kafkaapache-flinkpyflink

Unable to consume data using the latest Pyflink Kafka connector


I am trying to read the data from the Kafka topic. Kafka is set up fine. Now, when I wrote the code using PyFlink and no matter if I add the jars or not, the error remains the same.

from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema, Configuration


class SourceData(object):
    def __init__(self, env):
        self.env = env
        self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
        self.env.set_parallelism(1)
        self.config = Configuration()
        self.config.set_string("pipeline.jars", "file:///../jars/flink-sql-connector-kafka-1.17.1.jar")
        self.env.configure(self.config)


    def get_data(self):
        source = KafkaSource.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_topics("test-topic") \
            .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
            .set_value_only_deserializer(SimpleStringSchema()) \
            .build()
        self.env \
            .add_source(source) \
            .print()
        self.env.execute("source")


SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data()

Environment:

  1. Flink 1.17.1
  2. Java 11
  3. Kafka Client latest one
  4. Python 3.10.11

Error:

TypeError: Could not found the Java class 'org.apache.flink.connector.kafka.source.KafkaSource.builder'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'  

I also tried without config option and using env.add_jars but, still the error remains the same. Do I need to configure anything else?

The Second option I tried was copying the jar to the pyflink>lib inside the site-packages of my virtual environment. After doing this, I am getting the below error:

py4j.protocol.Py4JError: An error occurred while calling o12.addSource. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method addSource([class org.apache.flink.connector.kafka.source.KafkaSource, class java.lang.String, null]) does not exist

Solution

  • You can try below snippet:

    from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
    from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
    from pyflink.common import SimpleStringSchema, WatermarkStrategy
    
    
    class SourceData(object):
        def __init__(self, env):
            self.env = env
            self.env.add_jars("file:///Users/.../Downloads/flink-sql-connector-kafka-1.17.1.jar")
            self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
            self.env.set_parallelism(1)
    
        def get_data(self):
            source = KafkaSource.builder() \
                .set_bootstrap_servers("localhost:9092") \
                .set_topics("test-topic1") \
                .set_group_id("my-group") \
                .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
                .set_value_only_deserializer(SimpleStringSchema()) \
                .build()
            self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source").print()
            self.env.execute("source")  
    

    Probably the issue you were facing was w.r.t paths of the jar. Also, after reading your comments, I can say that you just need one jar as mentioned in the documentation. You don't need a kafka-clients jar as of now.

    I am late to the party but may be it will help someone someday!