I have the following task in Airflow (Cloud Composer) that triggers a Cloud DataFusion pipeline.
The problem is:
Airflow considers this task already a success when (within DataFusion) the DataProc cluster has been provisioned and the actual job has entered the RUNNING state.
But I only want it to be considered a success when it is COMPLETED.
from airflow.providers.google.cloud.operators.datafusion import \
CloudDataFusionStartPipelineOperator
my_task = CloudDataFusionStartPipelineOperator(
location='europe-west1',
pipeline_name="my_datafusion_pipeline_name",
instance_name="my_datafusion_instance_name",
task_id="my_task_name",
)
I had to look in the source code but the following states are the default success_states:
[PipelineStates.COMPLETED] + [PipelineStates.RUNNING]
So you have to limit the succes_states
to only [PipelineStates.COMPLETED]
, by using keyword success_states
like so:
from airflow.providers.google.cloud.operators.datafusion import \
CloudDataFusionStartPipelineOperator
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates
my_task = CloudDataFusionStartPipelineOperator(
location='europe-west1',
pipeline_name="my_datafusion_pipeline_name",
instance_name="my_datafusion_instance_name",
task_id="my_task_name",
success_states=[PipelineStates.COMPLETED], # overwrite default success_states
pipeline_timeout=3600, # in seconds, default is currently 300 seconds
)
See also:
Airflow documentation on the DataFusionStartPipelineOperator
Airflow source code used for success states of DataFusionStartPipelineOperator