amazon-web-servicesapache-sparkpysparkairflowamazon-emr

EMR Spark Job Step can't find mysql connector


I'm trying to use airflow to create an EMR Cluster, run a Spark job and terminate it. I was able to create the cluster using the airflow operators, but when I try to run a job to get data from mysql, I can't find the .JAR connector. It appears that it's trying to find the connector on a path that is not the "default". Here's the error message:

: java.io.FileNotFoundException: File file:/mnt/var/lib/hadoop/steps/[STEP-ID]/mysql-connector-j-8.0.33.jar does not exist

I'm trying to bootstrap to get the jars from an S3 folder. I changed some sensitive date for the s3 paths and connections to post here, but it has the correct values on my code.

# Spark Configurations
JOB_FLOW_OVERRIDES = {
    "Name": "EMR spark test",
    "ReleaseLabel": "emr-7.0.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"},
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2,
            },
            
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
        "Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
    },
    "BootstrapActions": [
        {
            "Name": "import custom Jars",
            "ScriptBootstrapAction": {
                "Path": "s3://PATH/copy_jars.sh",
                "Args": []
            }
        }
    ],
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole_V2",
    "LogUri": "s3://PATH/",
    
}

And the steps look like this:

# Steps to run
SPARK_STEPS = [
    {
        "Name": "workflow_extraction",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--master",
                "yarn",
                "--deploy-mode",
                "client",
                "--jars",
                "mysql-connector-j-8.0.33.jar",
                "--driver-class-path",
                "mysql-connector-j-8.0.33.jar",
                "--conf",
                "spark.executor.extraClassPath=mysql-connector-j-8.0.33.jar",
                #"s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
                "s3://PATH/spark_test.py",
                
                
            ],
        },
    }
]

And here's the python code:

from pyspark.sql import SparkSession

bucket = "s3://PATH/"

#.config("driver", "mysql-connector-j-8.0.33.jar") \

spark = SparkSession.builder \
    .appName("SparkWorkflow") \
    .getOrCreate()

url = "jdbc:mysql://host:3306/database"

mysql_properties = {
    "user": "user",
    "password": "password",
    "driver": "com.mysql.jdbc.Driver"
}

table_name = "table"

df = spark.read.jdbc(url, table_name, properties=mysql_properties)

df.write.parquet(bucket,mode="overwrite")

And the dag:

# Set default arguments
default_args = {
    "owner": "airflow",
    "start_date": datetime(2022, 3, 5),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# DAG
with DAG(
    "emr_and_airflow_integration",
    default_args=default_args,
    schedule_interval="0 1 * * *",
    max_active_runs=1,
    catchup=False
) as dag:

    # Only display - start_dag
    start_dag = DummyOperator(task_id="start_dag")

    # Create the EMR cluster
    create_emr_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
        region_name='us-east-1'
    )

    # Add the steps to the EMR cluster
    add_steps = EmrAddStepsOperator(
        task_id="add_steps",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id="aws_default",
        steps=SPARK_STEPS
    )
    last_step = len(SPARK_STEPS) - 1

    # Wait executions of all steps
    check_execution_steps = EmrStepSensor(
        task_id="check_execution_steps",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
        + str(last_step)
        + "] }}",
        aws_conn_id="aws_default",
    )

    # Terminate the EMR cluster
    terminate_emr_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id="aws_default",
    )

    # Only display - end_dag
    end_dag = DummyOperator(task_id="end_dag")

    # Data pipeline flow
    start_dag >> create_emr_cluster >> add_steps
    add_steps >> check_execution_steps >> terminate_emr_cluster >> end_dag

What am I doing wrong?


Solution

  • https://stackoverflow.com/a/51435038/3238085

    if you do spark-submit --help it will show:

    --jars JARS                 Comma-separated list of jars to include on the driver
                                  and executor classpaths.
    
    --packages                  Comma-separated list of maven coordinates of jars to include
                                  on the driver and executor classpaths. Will search the local
                                  maven repo, then maven central and any additional remote
                                  repositories given by --repositories. The format for the
                                  coordinates should be groupId:artifactId:version.
    

    if it is --jars

    then spark doesn't hit maven but it will search specified jar in the local file system it also supports following URL scheme hdfs/http/https/ftp.

    so if it is --packages

    then spark will search specific package in local maven repo then central maven repo or any repo provided by --repositories and then download it.

    If you want spark to download the jar from maven repository, you need to use --packages option. Keep in mind you have to use full maven coordinates i.e com.mysql:mysql-connector-j:8.0.33 as against just mysql-connector-j:8.0.33

    Alternatively you can specify this option within your python script as shown below. Then you don't have to specify the jars on the command arguments you pass to spark-submit

    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "com.mysql:mysql-connector-j:8.0.33") \
        .getOrCreate()
    
    sc = spark.sparkContext
    sqlContext = SQLContext(sc)