I am trying to read the data from the Kafka topic
. Kafka is set up fine. Now, when I wrote the code using PyFlink
and no matter if I add the jars or not, the error remains the same.
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema, Configuration
class SourceData(object):
def __init__(self, env):
self.env = env
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
self.env.set_parallelism(1)
self.config = Configuration()
self.config.set_string("pipeline.jars", "file:///../jars/flink-sql-connector-kafka-1.17.1.jar")
self.env.configure(self.config)
def get_data(self):
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
self.env \
.add_source(source) \
.print()
self.env.execute("source")
SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data()
Environment:
Error:
TypeError: Could not found the Java class 'org.apache.flink.connector.kafka.source.KafkaSource.builder'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
I also tried without config
option and using env.add_jars
but, still the error remains the same. Do I need to configure anything else?
The Second option I tried was copying the jar
to the pyflink>lib
inside the site-packages
of my virtual environment. After doing this, I am getting the below error:
py4j.protocol.Py4JError: An error occurred while calling o12.addSource. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method addSource([class org.apache.flink.connector.kafka.source.KafkaSource, class java.lang.String, null]) does not exist
You can try below snippet:
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema, WatermarkStrategy
class SourceData(object):
def __init__(self, env):
self.env = env
self.env.add_jars("file:///Users/.../Downloads/flink-sql-connector-kafka-1.17.1.jar")
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
self.env.set_parallelism(1)
def get_data(self):
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic1") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source").print()
self.env.execute("source")
Probably the issue you were facing was w.r.t paths of the jar. Also, after reading your comments, I can say that you just need one jar as mentioned in the documentation. You don't need a kafka-clients
jar as of now.
I am late to the party but may be it will help someone someday!