0

My use-case is as follows:

  • Task A is generating dataset using some input raw data
  • Task B is running some code using dataset as input
  • Task C is running some other code using dataset as input

The three tasks are scheduled to run daily, and Task B and Task C are scheduled to run enough time after Task A, and simply fail if the input dataset has not been generated for some reason.

As a first improvement I added an ExternalTaskSensor in both Task B and Task C, but this just avoids to run them if Task A did not yet finished or failed.

However, ExternalTaskSensor seems not to be working fine with backfilling (it is pretty fragile as it relies on execution date only, plus if Task A is run again, Task B and Task C won't know).

Solution 1 (not applicable): I have seen this SO's question: In airflow, is there a good way to call another dag's task?

This is not ideal for me because I'd like to keep Task A unaware of the dependent tasks, and handle the logic in Task B and Task C (or externally). Reason is that other tasks consuming the output of Task A will be added in the future (from different teams across the organization), and it's not desirable to update Task A each time.

Summary I'd like to trigger Task B and Task C if and only if Task A has been executed with success (independently if it has been scheduled or triggered manually), without modifying Task A to achieve that.

Alessandro S.
  • 875
  • 12
  • 24
  • Can you please elaborate what you mean by *keep Task A unaware of the dependent tasks*? I can't understand why task B and task C can't be downstream tasks depending on task A. Or why task B and task C can't be moved to a separate DAG triggered from first DAG with only task A. – nightgaunt Jun 20 '19 at 07:06
  • The point is that I'd like to add many more tasks depending on `Task A` in the future, but I'd like to have them *register* somehow, rather than update `Task A` all the time. Rationale is that `Task A` and the consuming tasks are likely to be handled by different teams, so the effort should be in the consumer side if possible. For `Task B` and `Task C` to be downstreams of `Task A`, they have to be inside the same DAG, right? I'd avoid that too, if possible, for the same reason. – Alessandro S. Jun 20 '19 at 09:51
  • 1
    I see what you mean. You will be developing `task A` which returns some results. Then other developers will be using output from your task to perform their own tasks (Lets say `task B` and `task C`). You don't want to modify the task A code every time someone wants to use `task A`. – nightgaunt Jun 20 '19 at 10:06
  • I will update the question to make it more explicit, thanks for your comment! – Alessandro S. Jun 20 '19 at 10:07

1 Answers1

1

To suit your scenario, the only concept I can think of is SubDags. (Refer to WARNING before you implement)

SubDagOperator allows you to attach a set of tasks to your task A. Refer to code below.

dag = DAG('parent_dag', description='Parent',
          schedule_interval='@daily',
          start_date=datetime.now())

task_a = DummyOperator(dag=dag, task_id='task_a')

subdag_task = SubDag(task_id='load_tasks',
    subdag=load_subdag('parent_dag', 'dependent_tasks'),
    dag=dag)

task_a >> subdag_task

Now in a separate file you define your load_subdag function.

def load_subdag(parent_dag_name, child_dag_name):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_name, child_dag_name),
        schedule_interval="@daily",
    )
    with dag_subdag:
        task_b = DummyOperator(
            task_id='load_subdag_task_b',
            dag=dag_subdag)

       task_c = DummyOperator(
            task_id='load_subdag_task_c',
            dag=dag_subdag)

    return dag_subdag

WARNING (In Red and bold): SubDag tasks occupy slots in your worker like maggots. Please understand the caveats completely before you jump into this. AIRFLOW-74 gives a picture of how bad it can be. It is outright rejected by many developers for the same reason.

nightgaunt
  • 890
  • 12
  • 30