apache-flinkamazon-emrflink-streamingpyflink

Flink Job Execution Fails with `NoClassDefFoundError` on AWS EMR with Python


I am trying to run a Flink job on an AWS EMR cluster (v7.3.0) using Python 3.9 and Apache Flink with PyFlink. My job reads from an AWS Kinesis stream and prints the stream data to console. However, when I execute the job on EMR (after SSHing onto the master node), it fails with the following error:

# test.py

import argparse
import json
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import MapFunction


class ProcessImage(MapFunction):
    def open(self, runtime_context):
        print("Constructor launched ...")

    def map(self, value):
        try:
            if not value:
                print("Empty Kinesis record received.")
                return None

            # Parse the JSON-formatted record
            record = json.loads(value)
            print(f"Processed record: {record}")

            return value
        except Exception as e:
            print(f"Error processing record: {e}")
            raise


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="StreamingApp")

    parser.add_argument(
        "--stream_name",
        type=str,
        help="The name of Kinesis Stream to connect Flink with.",
        required=True,
    )
    parser.add_argument(
        "--region",
        type=str,
        help="The region name of the streaming application.",
        choices=["us-east-1", "us-west-2"],
        required=True,
    )

    args = parser.parse_args()
    stream_name: str = args.stream_name
    region: str = args.region

    # Set up the Flink environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    env.add_jars(
        "file:////home/hadoop/flink-connector-kinesis-4.3.0-1.18.jar",
        "file:////home/hadoop/joda-time-2.12.5.jar",
    )

    # Kinesis Consumer properties
    kinesis_consumer_config = {
        "aws.region": region,
        "stream.initial.position": "LATEST",
        "aws.credentials.provider": "AUTO",
    }

    # Set up the Kinesis consumer to read from the Kinesis stream
    kinesis_source = FlinkKinesisConsumer(
        stream_name,
        SimpleStringSchema(),
        kinesis_consumer_config,
    )

    # Define the stream pipeline
    stream = env.add_source(kinesis_source)

    # Process record
    processed_stream = stream.map(ProcessImage(), output_type=Types.STRING())
    processed_stream.print()

    # Execute the Flink job
    env.execute()


sh-5.2$ python3 /home/hadoop/test.py --stream_name TestStream --region us-west-2
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR orHADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Traceback (most recent call last):
  File "/home/hadoop/test.py", line 67, in <module>
    main()
  File "/home/hadoop/test.py", line 63, in main
    env.execute("Flink Kinesis Processing Job")
  File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
    return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/ssm-user/.local/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
    return f(*a, **kw)
  File "/home/ssm-user/.local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
....
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152)
        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280)
        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412)
        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:366)
        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:541)
        at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)

The main part of the error seems to be:

Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)

I have tried adding the required JAR files, including flink-connector-kinesis-4.3.0-1.18.jar using env.add_jars() in my Python script and verified I have the correct IAM permissions, however, the script still is consistently failing.

  1. Has anyone else experienced this issue, and is it likely due to an incompatibility between the AWS SDK and the Flink Kinesis connector version?
  2. Are there any additional dependencies or configurations needed for the Kinesis connector to work on PyFlink in an EMR setup?
  3. Could this issue be related to the default class loader configuration in Flink on EMR?

Thanks.


Solution

  • The flink-connector-kinesis-4.3.0-1.18 jar does not contain all dependencies. You need to include all required depencies along with this. There is a convenience uber/fat jar you can use for this purpose, grab it from here https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/4.3.0-1.18

    It is called sql-connector since it is intended to be used with the Flink SQL client. However, this is reusable with Python apps, and is not exclusive to SQL; it contains Datastream and Table connectors too