currently, am executing my spark-submit commands in airflow by SSH using BashOperator
& BashCommand
but our client is not allowing us to do SSH into the cluster, is that possible to execute the Spark-submit
command without SSH into cluster from airflow?
You can use DataprocSubmitJobOperator to submit jobs in Airflow. Just make sure to pass correct parameters to the operator. Take note that the job
parameter is a dictionary based from Dataproc Job. So you can use this operator to submit different jobs like pyspark, pig, hive, etc.
The code below submits a pyspark job:
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PROJECT_ID = "my-project"
CLUSTER_NAME = "airflow-cluster" # name of created dataproc cluster
PYSPARK_URI = "gs://dataproc-examples/pyspark/hello-world/hello-world.py" # public sample script
REGION = "us-central1"
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'submit_dataproc_spark',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
submit_dataproc_job = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
submit_dataproc_job
Airflow run:
Airflow logs:
Dataproc job: