2

Having list of tasks which calls different dags from master dag.I'm using the TriggerDagrunoperator to accomplish this. But facing few issues.

  1. TriggerDagrunoperator doesn't wait for completion of external dag, it triggers next task. I want that to wait until completion and next task should trigger based on the status. Came across ExternalTaskSensor. It is making the process complicated. But the issue with externaltasksensor it only works with schedule. When i am using the externaltasksensor it is failing after timeout time has reached.
  2. Is there a way to trigger the different dags from a master dag sequentially not in parallel. So that it will trigger another dag only after the previously trigger dag has successfully completed all the tasks in the dag.
Jafar Sharif
  • 21
  • 1
  • 2
  • **[1]** i dont think ExternalTaskSensor requires a schedule; for your use-case, you can skip passing `external_task_id` & only pass `execution_date` or `execution_delta`. That ways, it will effectively act as an `ExternalDagSensor` [ref](https://github.com/apache/airflow/blob/master/airflow/sensors/external_task_sensor.py#L70) **[2]** other workaround that i know of is nasty: `SubDagOperator`. Check [this](https://stackoverflow.com/q/51325525/3679900) thread – y2k-shubham May 20 '20 at 13:39
  • I tried the way you stated and the dag sensor still in running state even though the dag has ran successfully. below are the params for your reference `sensor_run_initial = ExternalTaskSensor(task_id='dag_sensor_for_run_initial', external_dag_id='RunInitial', external_task_id=None, dag=dag )` Please tell me if any thing need to be changed in the externaltasksensor. – Jafar Sharif May 20 '20 at 20:35
  • Yes i acknowledge that it wouldnt work out of the box. You'll have to pass `execution_delta` instead of `execution_date` – y2k-shubham May 21 '20 at 01:52
  • @y2k-shubham tried with **execution_detla** the dag sensor remains in running state even though the external dag ran successfully. `sensor_run_initial = ExternalTaskSensor(task_id='dag_sensor_for_run_initial', external_dag_id='RunInitial', external_task_id=None, execution_delta= timedelta(minutes=30), dag=dag )`. I hope the params are as you suggested – Jafar Sharif May 21 '20 at 14:02

2 Answers2

1

Run this task after the triggering your external dag:

import time
from airflow.models import DagRun
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

def get_external_dag_status(dag_id,**kwargs):
    dag_id = dag_id 
    dag_runs = DagRun.find(dag_id=dag_id)
    for dag_run in dag_runs:
      #print("state = "+dag_run.state)
      res1 = dag_run.state
      #print(dag_run)
    return res1

def check_status(dag_id,**kwargs):
  st = get_external_dag_status(dag_id)
  while st != 'success':
    if st == 'failed':
      print(st)
      break
    time.sleep(300) #optional if need to check for every 5 minutes 
    st = get_external_dag_status(dag_id)
  if st == 'success':
    return st
  elif st == 'failed':
    raise ValueError('Dag Failed')

status_check = PythonOperator(task_id="dag_check",
                         python_callable=check_status,
                         op_kwargs={'dag_id':'your external dag id'},
                         dag=spark_dag
                         )
Jeremy Caney
  • 7,102
  • 69
  • 48
  • 77
0

After triggering a Dag using TriggerDagrunoperator, you can consider calling a DagSensor, that will wait for the Dag completion, and only then trigger other days. Here how we implement our version (not perfect but did the job):

import logging

from airflow.plugins_manager import AirflowPlugin
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State

logger = logging.getLogger('airflow.dag_sensor')


class DagSensor(BaseSensorOperator):
    """
    Sensor that check if a Dag is currently running.
    It proceeds only if the Dag is in not running.
    """

    template_fields = ['external_dag_id']
    ui_color = '#FFFFCC'

    @apply_defaults
    def __init__(self,
                 external_dag_id,
                 *args,
                 **kwargs):
        super(DagSensor, self).__init__(*args, **kwargs)
        self.external_dag_id = external_dag_id

    @provide_session
    def poke(self, context, session=None):
        dag_run = DagRun

        count = session.query(dag_run).filter(
            dag_run.dag_id == self.external_dag_id,
            dag_run._state.in_([State.RUNNING])
        ).count()
        session.commit()
        session.close()

        logger.info(f'Dag {self.external_dag_id} in running status: {count}')

        if count > 0:
            return False
        else:
            return True


class DagSensorPlugin(AirflowPlugin):
    name = 'dag_sensor_plugin'
    operators = [DagSensor]

Here how you can call it:

from airflow.operators import DagSensor
check_my_dag_completion = DagSensor(
    dag=dag,
    task_id='check_my_dag_completion',
    external_dag_id='my_dag',
    poke_interval=30,
    timeout=3600
)

This means that you can have something like this in your workflow:

call_dag_a >> check_dag_a >> call_dag_b >> check_dag_b
nicor88
  • 1,398
  • 10
  • 13
  • Thanks! Is dagsensor available in airflow sensors? How can i import the dag sensor into my existing code? – Jafar Sharif May 20 '20 at 10:34
  • DagSensor is something that we implemented. You need to save the code into plugins/dag_sensor.py. Inside your airflow.cfg there is the plugin_folder, you just need to be sure that is the right one. – nicor88 May 20 '20 at 11:20
  • Instead of adding our own plugins, is there a way to achieve the above functionality with the available features in airflow. – Jafar Sharif May 20 '20 at 12:57
  • As far as I know no. But that's the beauty of airflow, you can extend it. The task sensor alone is not suitable. – nicor88 May 20 '20 at 13:00