I'm trying to use airflow to create an EMR Cluster, run a Spark job and terminate it. I was able to create the cluster using the airflow operators, but when I try to run a job to get data from mysql, I can't find the .JAR connector. It appears that it's trying to find the connector on a path that is not the "default". Here's the error message:
: java.io.FileNotFoundException: File file:/mnt/var/lib/hadoop/steps/[STEP-ID]/mysql-connector-j-8.0.33.jar does not exist
I'm trying to bootstrap to get the jars from an S3 folder. I changed some sensitive date for the s3 paths and connections to post here, but it has the correct values on my code.
# Spark Configurations
JOB_FLOW_OVERRIDES = {
"Name": "EMR spark test",
"ReleaseLabel": "emr-7.0.0",
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
"Configurations": [
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"},
}
],
}
],
"Instances": {
"InstanceGroups": [
{
"Name": "Master node",
"Market": "SPOT",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
{
"Name": "Core - 2",
"Market": "SPOT",
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2,
},
],
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
"Ec2SubnetId": "{{ var.value.ec2_subnet_id }}",
},
"BootstrapActions": [
{
"Name": "import custom Jars",
"ScriptBootstrapAction": {
"Path": "s3://PATH/copy_jars.sh",
"Args": []
}
}
],
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole_V2",
"LogUri": "s3://PATH/",
}
And the steps look like this:
# Steps to run
SPARK_STEPS = [
{
"Name": "workflow_extraction",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--master",
"yarn",
"--deploy-mode",
"client",
"--jars",
"mysql-connector-j-8.0.33.jar",
"--driver-class-path",
"mysql-connector-j-8.0.33.jar",
"--conf",
"spark.executor.extraClassPath=mysql-connector-j-8.0.33.jar",
#"s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
"s3://PATH/spark_test.py",
],
},
}
]
And here's the python code:
from pyspark.sql import SparkSession
bucket = "s3://PATH/"
#.config("driver", "mysql-connector-j-8.0.33.jar") \
spark = SparkSession.builder \
.appName("SparkWorkflow") \
.getOrCreate()
url = "jdbc:mysql://host:3306/database"
mysql_properties = {
"user": "user",
"password": "password",
"driver": "com.mysql.jdbc.Driver"
}
table_name = "table"
df = spark.read.jdbc(url, table_name, properties=mysql_properties)
df.write.parquet(bucket,mode="overwrite")
And the dag:
# Set default arguments
default_args = {
"owner": "airflow",
"start_date": datetime(2022, 3, 5),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# DAG
with DAG(
"emr_and_airflow_integration",
default_args=default_args,
schedule_interval="0 1 * * *",
max_active_runs=1,
catchup=False
) as dag:
# Only display - start_dag
start_dag = DummyOperator(task_id="start_dag")
# Create the EMR cluster
create_emr_cluster = EmrCreateJobFlowOperator(
task_id="create_emr_cluster",
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
region_name='us-east-1'
)
# Add the steps to the EMR cluster
add_steps = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
steps=SPARK_STEPS
)
last_step = len(SPARK_STEPS) - 1
# Wait executions of all steps
check_execution_steps = EmrStepSensor(
task_id="check_execution_steps",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')["
+ str(last_step)
+ "] }}",
aws_conn_id="aws_default",
)
# Terminate the EMR cluster
terminate_emr_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_emr_cluster",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
)
# Only display - end_dag
end_dag = DummyOperator(task_id="end_dag")
# Data pipeline flow
start_dag >> create_emr_cluster >> add_steps
add_steps >> check_execution_steps >> terminate_emr_cluster >> end_dag
What am I doing wrong?
https://stackoverflow.com/a/51435038/3238085
if you do spark-submit --help it will show:
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
if it is --jars
then spark doesn't hit maven but it will search specified jar in the local file system it also supports following URL scheme hdfs/http/https/ftp.
so if it is --packages
then spark will search specific package in local maven repo then central maven repo or any repo provided by --repositories and then download it.
If you want spark to download the jar from maven repository,
you need to use --packages
option. Keep in mind you have to use full maven coordinates i.e com.mysql:mysql-connector-j:8.0.33
as against just mysql-connector-j:8.0.33
Alternatively you can specify this option within your python script
as shown below. Then you don't have to specify the jars on the command arguments you pass to spark-submit
spark = SparkSession \
.builder \
.config("spark.jars.packages", "com.mysql:mysql-connector-j:8.0.33") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)