16

I need to have several identical (differing only in arguments) top-level DAGs that can also be triggered together with following constraints / assumptions:

  • Individual top-level DAGs will have schedule_interval=None as they will only need occasional manual triggering
  • The series of DAGs, however, needs to run daily
  • Order and number of DAGs in series is fixed (known ahead of writing code) and changes rarely (once in a few months)
  • Irrespective of whether a DAG fails or succeeds, the chain of triggering must not break
  • Currently they must be run together in series; in future they may require parallel triggering

So I created one file for each DAG in my dags directory and now I must wire them up for sequential execution. I have identified two ways this could be done:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • Works in my demo but runs in parallel (not sequentially) as it doesn't wait for triggered DAG to finish before moving onto next one
    • ExternalTaskSensor might help overcome above limitation but it would make things very messy

My questions are

  • How to overcome limitation of parent_id prefix in dag_id of SubDags?
  • How to force TriggerDagRunOperators to await completion of DAG?
  • Any alternate / better way to wire-up independent (top-level) DAGs together?
  • Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

I'm using puckel/docker-airflow with

  • Airflow 1.9.0-4
  • Python 3.6-slim
  • CeleryExecutor with redis:3.2.7

EDIT-1

Clarifying @Viraj Parekh's queries

Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered?

When I trigger the import_parent_v1 DAG, all the 3 external DAGs that it is supposed to fire using TriggerDagRunOperator start running parallely even when I chain them sequentially. Actually the logs indicate that while they are fired one-after another, the execution moves onto next DAG (TriggerDagRunOperator) before the previous one has finished. enter image description here enter image description here NOTE: In this example, the top-level DAGs are named as importer_child_v1_db_X and their corresponding task_ids (for TriggerDagRunOperator) are named as importer_v1_db_X

Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?

I have to chain several similar (differing only in arguments) DAGs together in a workflow that triggers them one-by-one. So there isn't just one TriggerDagRunOperator that I could put at last, there are many (here 3, but would be upto 15 in production)

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • As [reported by **@Freedom**](https://stackoverflow.com/questions/42256675/airflow-how-to-extend-subdagoperator#comment85267925_42256675), `SubDagOperator` must not be extended until `Airflow v2.0` – y2k-shubham Sep 07 '18 at 05:33
  • Here are some links related in *some* way to this problem [link1](https://stackoverflow.com/questions/50934266) [link2](https://stackoverflow.com/questions/41493605) [link3](https://stackoverflow.com/questions/38022323) [link4](https://medium.com/@hafizbadrie/airflow-when-your-dag-is-far-behind-the-schedule-ea11bf02e44c) – y2k-shubham Dec 13 '18 at 08:49
  • [link5](https://stackoverflow.com/questions/48164160/airflow-dag-success-callback) – y2k-shubham Dec 13 '18 at 09:00
  • Here's the [link](https://lists.apache.org/thread.html/a092fd4ff5bf58190310abdce58bbb50e95b615d572bb90cd88ecc45@%3Cdev.airflow.apache.org%3E) to my query on `Airflow` [`dev`-mailing-list](https://lists.apache.org/list.html?dev@airflow.apache.org) – y2k-shubham Dec 13 '18 at 10:57

4 Answers4

11

Taking hints from @Viraj Parekh's answer, I was able to make TriggerDagRunOperator work in the intended fashion. I'm hereby posting my (partial) answer; will update as and when things become clear.


How to overcome limitation of parent_id prefix in dag_id of SubDags?

As told @Viraj, there's no straight way of achieving this. Extending SubDagOperator to remove this check might work but I decided to steer clear of it


How to force TriggerDagRunOperators to await completion of DAG?

  • Looking at the implementation, it becomes clear that the job of TriggerDagRunOperator is just to trigger external DAG; and that's about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I'm observing is understandable.

  • ExternalTaskSensor is the obvious way out. However while learning basics of Airflow I was relying on manual triggering of DAGs (schedule_interval=None). In such case, ExternalTaskSensor makes it difficult to accurately specify execution_date for the external task (who's completion is being awaited), failing which the sensor gets stuck.

  • So taking hint from implementation, I made minor adjustment to behaviour of ExternalTaskSensor by awaiting completion of all task_instances of concerned task having

    execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

    This achieves the desired result: external DAGs run one-after-other in sequence.


Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

Again going by @Viraj this can be done by assigning DAGs to global scope using globals()[dag_id] = DAG(..)


EDIT-1

Maybe I was referring to incorrect resource (the link above is already dead), but ExternalTaskSensor already includes the params execution_delta & execution_date_fn to easily restrict execution_date(s) for the task being sensed.

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Although I haven't really used `ExternalTaskSensor` in any *real* `DAG`s, it still appears to be [tricky](https://stackoverflow.com/q/54128117/3679900) to put to action – y2k-shubham Jan 11 '19 at 05:15
  • Also read about [this idea](https://stackoverflow.com/a/54229812/3679900) that struck me and do care to report any *pitfalls* / reasons why it wouldn't work – y2k-shubham Jan 23 '19 at 21:56
  • Hi @y2k-shubham. in your final example, why did you choose 10 minutes in: `execution_delta=timedelta(minutes=10),`? How did you know 10 minutes? why not 5 or 15? Thanks. – arcee123 Jan 06 '20 at 19:05
  • 1
    *In such case, `ExternalTaskSensor` makes it difficult to accurately specify execution_date for the external task*: The `execution_date` field for `TriggerDagRunOperator()` is *templated*; use the same `execution_date` value as the parent DAG: `TriggerDagRunOperator(task_id="...", trigger_dag_id="some_dag_id", execution_date="{{ts}}", ...)`, at which point the `ExternalTaskSensor()` will match exactly as it always uses the same `execution_date`, no delta required. – Martijn Pieters Jul 28 '20 at 12:58
  • 1
    2 years late. For your question on "How to force `TriggerDagRunOperator` to await completion of DAG?", there's a parameter called `wait_for_completion` that will only mark the Operator as `success` when the triggered DAG completes. Docs [here](https://airflow.apache.org/docs/apache-airflow/2.4.2/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator). – Jialer Chew Apr 29 '23 at 17:05
2
  • Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered? Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?

  • For creating DAGs similar DAGs, you can dynamically generate the DAGs from one Python file. You could do something like this:

from airflow import DAG

from airflow.operators.python_operator import PythonOperator


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

def hello_world_py(*args):
    print('Hello World')
    print('This is DAG: {}'.format(str(dag_number)))

dag = DAG(dag_id,
          schedule_interval=schedule,
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id='hello_world',
        python_callable=hello_world_py,
        dag_number=dag_number)

return dag


# build a dag for each number in range(10)
for n in range(1, 10):
dag_id = 'hello_world_{}'.format(str(n))

default_args = {'owner': 'airflow',
                'start_date': datetime(2018, 1, 1)
                }

schedule = '@daily'

dag_number = n

globals()[dag_id] = create_dag(dag_id,
                              schedule,
                              dag_number,
                              default_args)

You can read more about that approach here. If most of you are producing DAGs are pretty similar, you might want to consider storing the config in an Airflow Variableenter link description here

You probably won't be able to overcome the prefix limitations of the SubDag Operator - I'd suggest removing SubDags from your workflows entirely and just have them run as separate DAGs - it'll make it much easier to go back and re-run older DagRuns if you ever find yourself having to do that.

Viraj Parekh
  • 1,351
  • 6
  • 14
  • I don't understand the parameter `dag_number`. AFAIK, there's no such param in [`PythonOperator`](https://airflow.apache.org/code.html#airflow.operators.PythonOperator) or even [`BaseOperator`](https://airflow.apache.org/code.html#baseoperator) – y2k-shubham Jul 15 '18 at 15:24
1

This worked for me when I was using schedule none.


trigger_dag = TriggerDagRunOperator(
    task_id=f'dag_id-trigger',
    trigger_dag_id='dag_id',
    python_callable=set_args,
    dag=dag,
)


def get_most_recent_dag_run(execution_date, **kwargs):
    return DagRun.find(dag_id='dag_id').pop().execution_date


sensor = ExternalTaskSensor(
    task_id=f'dag_id-sensor',
    external_dag_id='dag_id',
    execution_date_fn=get_most_recent_dag_run,
    dag=dag,
    poke_interval=5,
    external_task_id='last_task_id' # This is task need to be in your external dag
)

jmcgrath207
  • 1,317
  • 2
  • 19
  • 31
1

If you are looking for a way to wait for the triggered DAG completion, in Airflow 2.0 it became much easier than before.
There is a new version of the TriggerDagRunOperator allowing you to that. No need to use the ExternalTaskSensor anymore.
I've made a 10 mins tutorial about it https://youtu.be/8uKW0mPWmCk

Enjoy

Marc Lamberti
  • 763
  • 2
  • 9
  • 24
  • FWIW (Not affiliated) Marc's videos are great! I watched them before stumbling here. But for completeness of this answer, you are looking for the new argument (added in Airflow 2.0) `TriggerDagRunOperator(...,wait_for_completion=True, poke_interval=30)` – muon Jun 22 '22 at 13:27