pythonairflowgoogle-cloud-composergoogle-cloud-data-fusion

Cloud Composer / Airflow start new task only when Cloud DataFusion task is really finished


I have the following task in Airflow (Cloud Composer) that triggers a Cloud DataFusion pipeline.

The problem is:
Airflow considers this task already a success when (within DataFusion) the DataProc cluster has been provisioned and the actual job has entered the RUNNING state.

But I only want it to be considered a success when it is COMPLETED.

from airflow.providers.google.cloud.operators.datafusion import \
    CloudDataFusionStartPipelineOperator

my_task = CloudDataFusionStartPipelineOperator(
    location='europe-west1',
    pipeline_name="my_datafusion_pipeline_name",
    instance_name="my_datafusion_instance_name", 
    task_id="my_task_name",
)

Solution

  • I had to look in the source code but the following states are the default success_states:
    [PipelineStates.COMPLETED] + [PipelineStates.RUNNING]

    So you have to limit the succes_states to only [PipelineStates.COMPLETED], by using keyword success_states like so:

    from airflow.providers.google.cloud.operators.datafusion import \
        CloudDataFusionStartPipelineOperator
    from airflow.providers.google.cloud.hooks.datafusion import PipelineStates
    
    my_task = CloudDataFusionStartPipelineOperator(
        location='europe-west1',
        pipeline_name="my_datafusion_pipeline_name",
        instance_name="my_datafusion_instance_name", 
        task_id="my_task_name",
        success_states=[PipelineStates.COMPLETED], # overwrite default success_states
        pipeline_timeout=3600, # in seconds, default is currently 300 seconds
    )
    

    See also:
    Airflow documentation on the DataFusionStartPipelineOperator

    Airflow source code used for success states of DataFusionStartPipelineOperator