I am trying to run a Flink job on an AWS EMR cluster (v7.3.0) using Python 3.9 and Apache Flink with PyFlink. My job reads from an AWS Kinesis stream and prints the stream data to console. However, when I execute the job on EMR (after SSHing onto the master node), it fails with the following error:
# test.py
import argparse
import json
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import MapFunction
class ProcessImage(MapFunction):
def open(self, runtime_context):
print("Constructor launched ...")
def map(self, value):
try:
if not value:
print("Empty Kinesis record received.")
return None
# Parse the JSON-formatted record
record = json.loads(value)
print(f"Processed record: {record}")
return value
except Exception as e:
print(f"Error processing record: {e}")
raise
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="StreamingApp")
parser.add_argument(
"--stream_name",
type=str,
help="The name of Kinesis Stream to connect Flink with.",
required=True,
)
parser.add_argument(
"--region",
type=str,
help="The region name of the streaming application.",
choices=["us-east-1", "us-west-2"],
required=True,
)
args = parser.parse_args()
stream_name: str = args.stream_name
region: str = args.region
# Set up the Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.add_jars(
"file:////home/hadoop/flink-connector-kinesis-4.3.0-1.18.jar",
"file:////home/hadoop/joda-time-2.12.5.jar",
)
# Kinesis Consumer properties
kinesis_consumer_config = {
"aws.region": region,
"stream.initial.position": "LATEST",
"aws.credentials.provider": "AUTO",
}
# Set up the Kinesis consumer to read from the Kinesis stream
kinesis_source = FlinkKinesisConsumer(
stream_name,
SimpleStringSchema(),
kinesis_consumer_config,
)
# Define the stream pipeline
stream = env.add_source(kinesis_source)
# Process record
processed_stream = stream.map(ProcessImage(), output_type=Types.STRING())
processed_stream.print()
# Execute the Flink job
env.execute()
sh-5.2$ python3 /home/hadoop/test.py --stream_name TestStream --region us-west-2
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR orHADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Traceback (most recent call last):
File "/home/hadoop/test.py", line 67, in <module>
main()
File "/home/hadoop/test.py", line 63, in main
env.execute("Flink Kinesis Processing Job")
File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
....
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:541)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
The main part of the error seems to be:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
I have tried adding the required JAR files, including flink-connector-kinesis-4.3.0-1.18.jar
using env.add_jars()
in my Python script and verified I have the correct IAM permissions, however, the script still is consistently failing.
Thanks.
The flink-connector-kinesis-4.3.0-1.18
jar does not contain all dependencies. You need to include all required depencies along with this. There is a convenience uber/fat jar you can use for this purpose, grab it from here https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/4.3.0-1.18
It is called sql-connector
since it is intended to be used with the Flink SQL client. However, this is reusable with Python apps, and is not exclusive to SQL; it contains Datastream and Table connectors too