I have three pipelines in Data Fusion say A,B and C. I want to the Pipeline C to get triggered after execution of Pipeline A and B both Completes. Pipeline triggers are putting the dependency on one pipeline only. Can this be implemented in Data Fusion ?
3 Answers
You can do it using Google Cloud Composer [1]. In order to perform this action first of all you need to create a new Environment in Google Cloud Composer [2], once done, you need to install a new Python Package in your environment [3], and the package that you will need to install is [4] "apache-airflow-backport-providers-google".
With this package installed you will be able to use these operations [5], the one you will need is [6] "Start a DataFusion pipeline", this way you will be able to start a new pipeline from Airflow.
An example of the python code would be as follows:
import airflow
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionStartPipelineOperator
)
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with models.DAG(
'composer_DF',
schedule_interval=datetime.timedelta(days=1),
default_args=default_args) as dag:
# the operations.
A = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="A",
instance_name="instance_name", task_id="start_pipelineA",
)
B = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="B",
instance_name="instance_name", task_id="start_pipelineB",
)
C = CloudDataFusionStartPipelineOperator(
location="us-west1", pipeline_name="C",
instance_name="instance_name", task_id="start_pipelineC",
)
# First A then B and then C
A >> B >> C
You can set the time intervals by checking the Airflow documentation.
Once you have this code saved as a .py file, save it to ther Google Cloud Storage DAG folder of your environment.
When the DAG starts, it will execute task A and when it finishes it will execute task B and so on.
[1] https://cloud.google.com/composer
[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies
[4] https://pypi.org/project/apache-airflow-backport-providers-google/

- 396
- 1
- 11
-
Hi Gonzalo, That's Very Cool. Thanks for the awesome solution. Can we run Pipeline A and B in parallel and how we are going to schedule this script ? – SUDHIR GARG Aug 28 '20 at 11:12
-
To do so is a bit more complicated as Cloud Composer executes the tasks sequentially, but you can check this answer if you want to investigate it deeper [1]. If you think I answer your main question, please accept the answer and it can help the rest of the community members. [1] https://stackoverflow.com/questions/52741536/running-airflow-tasks-dags-in-parallel – Gonzalo Pérez Fernández Aug 28 '20 at 11:30
There is no direct way i could think of but two workarounds
Work around 1. Merging the pipeline A and B into pipeline AB then trigger pipeline C (AB > C).
Pipeline A - (GCS Copy > Decompress), Pipeline B - (GCS2 > thrashsad)
BigQueryExecute to mitigate error : Invalid DAG. There is an island made up of stages..
In BigQueryExecute, valid and dummy query.
Merging the two pipeline in one, may unease the pipeline testing. To overcome this you can add a dummy condition to run a pipeline one time.
- In BigQueryExecute,change query to 'Select ${flag}' and pass the value of flag in runtime argument or Select 1 as flag and tick "Row As Arguments" to true.
- Add condition plugin after BigQueryExecute and put condition runtime['flag'] = 1
- Condition plugin has two outlet, connect them to pipeline A and pipeline B.
Workaround 2 : Store the flag of both pipelines(A & B) in BiqQuery table,create two flow A>C and B >C to trigger the pipeline C. This would trigger pipeline C twice but using BigQueryExecute and condition plugin will run only when both flags are available in BigQuery table.
How?
- In Pipeline A & B to write output (a row) to BigQuery table 'Pipeline_Run'
- In Pipeline C, add BigQueryExecute and query 'select count(*) as Cnt from ds.Pipeline_Run' and tick "Row As Arguments" to true.
- In Pipeline C, add Condition plugin and check if value of cnt is 2 (runtime['cnt'] = 2) and connect your rest of the pipeline's plugins to its "Yes" outlet.

- 374
- 2
- 8
You can explore "schedules" set through CDAP REST APIs. That allows parallel execution of pipelines and there is no dependency on cloud composer (except for file based trigger of first pipeline in workflow. For that you would need cloud function or may be cloud composer file sensor)

- 25
- 4
-
HI Adideas, With Rest APIs schedules also i cannot control the execution that if pipeline A and B both are completed then only trigger pipeline C. Please give some example if this can be done. – SUDHIR GARG Sep 04 '20 at 08:09