2

I have an onpremise CDAP data fusion instance with multiple namespaces. How to trigger the pipeline using airflow operators? I have tried exploring the airflow available operators and this page but not very helpful https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline

Anonymous
  • 21
  • 1

1 Answers1

1

Assuming you already deployed the pipeline and you have the location, instance name and pipeline name of the pipeline you want to run. See CloudDataFusionStartPipelineOperator() for the parameters that it accepts.

Using the quickstart pipeline, I triggered the pipeline using CloudDataFusionStartPipelineOperator(). See operator usage below:

import airflow
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'trigger_df',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    start_pipeline = CloudDataFusionStartPipelineOperator(
        location='us-central1',
        pipeline_name='DataFusionQuickstart',
        instance_name='test',
        task_id="start_pipeline",
    )

    start_pipeline

Success "Graph View":

enter image description here

Logs:

enter image description here

Ricco D
  • 6,873
  • 1
  • 8
  • 18
  • Wonderful example for cloud data fusion. Thanks. But I am looking for CDAP the on premise instance of data fusion. A similar example will be very helpful. When I say on premise there will be a host, port and user authentication required to connect – Anonymous Jan 06 '22 at 15:49