google-cloud-platformairflowspark-submitdataproc

Trigger spark submit jobs from airflow on Dataproc Cluster without SSH


currently, am executing my spark-submit commands in airflow by SSH using BashOperator & BashCommand but our client is not allowing us to do SSH into the cluster, is that possible to execute the Spark-submit command without SSH into cluster from airflow?


Solution

  • You can use DataprocSubmitJobOperator to submit jobs in Airflow. Just make sure to pass correct parameters to the operator. Take note that the job parameter is a dictionary based from Dataproc Job. So you can use this operator to submit different jobs like pyspark, pig, hive, etc.

    The code below submits a pyspark job:

    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
    
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    PROJECT_ID = "my-project"
    CLUSTER_NAME = "airflow-cluster" # name of created dataproc cluster
    PYSPARK_URI = "gs://dataproc-examples/pyspark/hello-world/hello-world.py" # public sample script
    REGION = "us-central1"
    
    PYSPARK_JOB = {
        "reference": {"project_id": PROJECT_ID},
        "placement": {"cluster_name": CLUSTER_NAME},
        "pyspark_job": {"main_python_file_uri": PYSPARK_URI},
        }
    
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': YESTERDAY,
    }
    
    with models.DAG(
            'submit_dataproc_spark',
            catchup=False,
            default_args=default_args,
            schedule_interval=datetime.timedelta(days=1)) as dag:
    
        submit_dataproc_job = DataprocSubmitJobOperator(
                task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
                )
    
        submit_dataproc_job
    

    Airflow run:

    enter image description here

    Airflow logs:

    enter image description here

    Dataproc job:

    enter image description here