I have an onpremise CDAP data fusion instance with multiple namespaces. How to trigger the pipeline using airflow operators? I have tried exploring the airflow available operators and this page but not very helpful https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline
Asked
Active
Viewed 514 times
1 Answers
1
Assuming you already deployed the pipeline and you have the location, instance name and pipeline name of the pipeline you want to run. See CloudDataFusionStartPipelineOperator() for the parameters that it accepts.
Using the quickstart pipeline, I triggered the pipeline using CloudDataFusionStartPipelineOperator()
. See operator usage below:
import airflow
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'trigger_df',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
start_pipeline = CloudDataFusionStartPipelineOperator(
location='us-central1',
pipeline_name='DataFusionQuickstart',
instance_name='test',
task_id="start_pipeline",
)
start_pipeline
Success "Graph View":
Logs:

Ricco D
- 6,873
- 1
- 8
- 18
-
Wonderful example for cloud data fusion. Thanks. But I am looking for CDAP the on premise instance of data fusion. A similar example will be very helpful. When I say on premise there will be a host, port and user authentication required to connect – Anonymous Jan 06 '22 at 15:49