I have three pipelines in Data Fusion say A,B and C. I want to the Pipeline C to get triggered after execution of Pipeline A and B both Completes. Pipeline triggers are putting the dependency on one pipeline only. Can this be implemented in Data Fusion ?
You can do it using Google Cloud Composer [1]. In order to perform this action first of all you need to create a new Environment in Google Cloud Composer [2], once done, you need to install a new Python Package in your environment [3], and the package that you will need to install is [4] "apache-airflow-backport-providers-google".
With this package installed you will be able to use these operations [5], the one you will need is [6] "Start a DataFusion pipeline", this way you will be able to start a new pipeline from Airflow.
An example of the python code would be as follows:
import airflow
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionStartPipelineOperator
)
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with models.DAG(
'composer_DF',
schedule_interval=datetime.timedelta(days=1),
default_args=default_args) as dag:
# the operations.
A = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="A",
instance_name="instance_name", task_id="start_pipelineA",
)
B = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="B",
instance_name="instance_name", task_id="start_pipelineB",
)
C = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="C",
instance_name="instance_name", task_id="start_pipelineC",
)
# First A then B and then C
A >> B >> C
You can set the time intervals by checking the Airflow documentation.
Once you have this code saved as a .py file, save it to ther Google Cloud Storage DAG folder of your environment.
When the DAG starts, it will execute task A and when it finishes it will execute task B and so on.
[1] https://cloud.google.com/composer
[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies
[4] https://pypi.org/project/apache-airflow-backport-providers-google/