I am trying to login into a server 100.18.10.182 and triggering my spark submit job in the server 100.18.10.36 from .182 server in Apache Airflow. I have used BashOperator (a shell script to ssh into 100.18.10.182 server) and for the spark submit job, I have used SparkSubmitOperator as a downstream to BashOperator. I am able to execute the BashOperator successfully but the SparkOperator fails with: Cannot execute: Spark submit
I think this is because I am unable to pass the session of my SSH (of .182 server) into the next SparkSubmitOperator or it may be due to some other issue related to --jars or --packages, not sure here.
I was thinking to use xcom_push to push some data from my BashOperator and xcom_pull into the SparkSubmitOperator but not sure how to pass it in a way that my server is logged in and then my SparkSubmitOperator gets triggered from that box itself?
Airflow dag code:
t2 = BashOperator(
task_id='test_bash_operator',
bash_command="/Users/hardikgoel/Downloads/Work/airflow_dir/shell_files/airflow_prod_ssh_script.sh ",
dag=dag)
t2
t3_config = {
'conf': {
"spark.yarn.maxAppAttempts": "1",
"spark.yarn.executor.memoryOverhead": "8"
},
'conn_id': 'spark_default',
'packages': 'com.sparkjobs.SparkJobsApplication',
'jars': '/var/spark/spark-jobs-0.0.1-SNAPSHOT-1/spark-jobs-0.0.1-SNAPSHOT.jar firstJob',
'driver_memory': '1g',
'total_executor_cores': '21',
'executor_cores': 7,
'executor_memory': '48g'
}
t3 = SparkSubmitOperator(
task_id='t3',
**t3_config)
t2 >> t3
Shell Script code:
#!/bin/bash
USERNAME=hardikgoel
HOSTS="100.18.10.182"
SCRIPT="pwd; ls"
ssh -l ${USERNAME} ${HOSTS} "${SCRIPT}"
echo "SSHed successfully"
if [ ${PIPESTATUS[0]} -eq 0 ]; then
echo "successfull"
fi
Combine SSH and Spark submit commands within the same BashOperator:
t2 = BashOperator(
task_id='ssh_and_spark_submit',
bash_command="ssh -tt ${USERNAME}@${HOSTS} '/path/to/spark-submit --jars ${JARS} --packages ${PACKAGES} ${SPARK_SUBMIT_ARGS}'",
dag=dag
)
You can also use xcom:
t2 = BashOperator(
task_id='ssh_and_push_success',
bash_command="ssh -tt ${USERNAME}@${HOSTS} 'pwd; ls' && echo 'success'",
xcom_push=True,
dag=dag
)
t3 = SparkSubmitOperator(
task_id='spark_submit_if_ssh_success',
trigger_rule='one_success',
provide_context=True, # Access XCom value
**t3_config
)
def trigger_spark_if_ssh_success(context):
return context['ti'].xcom_pull(task_ids='ssh_and_push_success') == 'success'
t3.set_upstream(t2)
t3.set_downstream(TriggerDagRunOperator(trigger_dag_id="downstream_dag_id"))