5

I have an airflow workflow that I'd like to modify (see illustration at the bottom).
However, I couldn't find a way to do that in the docs.

I've looked at subdags, branching and xcoms without luck.

There doesn't seem to be a way to specify how many tasks should run in parallel in a subdag based on a return from an operator.
To add to the problem, each task in the subdag receives a different parameter (an element from the list returned by the previous operator)

This is an illustration of what I'm trying to do : airflow description

PhilipGarnero
  • 2,399
  • 4
  • 17
  • 24

1 Answers1

3

I've run into this as well and haven't really found a clean way to address it. If you know all the different possible parameters you would pass to each subdag...then what you can do is hardcode that into the DAG file and just always create the DAG with every possible subdag. Then you have an operator (similar your "get every n") which fetches the list of subdags you want to run and have it mark any downstream subdag not in the list as skipped. Something like this:

SUBDAGS = {
    'a': {'id': 'foo'},
    'b': {'id': 'bar'},
    'c': {'id': 'test'},
    'd': {'id': 'hi'},
}   

def _select_subdags(**context):
    names = fetch_list()  # returns ["a", "c", "d"]
    tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]

    session = Session()
    tis = session.query(TaskInstance).filter(
        TaskInstance.dag_id == context['dag'].dag_id, 
        TaskInstance.execution_date == context['ti'].execution_date,
        TaskInstance.task_id.in_(tasks_to_skip),
    )
    for ti in tis:
        now = datetime.utcnow()
        ti.state = State.SKIPPED
        ti.start_date = now
        ti.end_date = now
        session.merge(ti)
    session.commit()
    session.close()

select_subdags = PythonOperator(
    task_id='select_subdags',
    dag=dag,
    provide_context=True,
    python_callable=_select_subdags,
)

for name, params in SUBDAGS.iteritems():
    child_dag_id = 'my_subdag_' + name
    subdag_op = SubDagOperator(
        task_id=child_dag_id,
        dag=dag,
        subdag=my_subdag(dag.dag_id, child_dag_id, params),
    )
    select_subdags >> subdag_op

Obviously not ideal, especially when you end up wanting to just run one subdag out of hundreds. We've also run into some performance issues with thousands of subdags in a single DAG, as it can lead to tons of task instances, majority of which are simply skipped.

Daniel Huang
  • 6,238
  • 34
  • 33
  • I didn't know you could interact with airflow db like that. Is it "safe" to do it though ? Wouldn't there be any side effects ? If we assume it is, we could code something that push new task instances in the db (based on a placeholder subdag) with xcoms providing us with the list of sub dags we want. Don't you think so ? – PhilipGarnero Jan 18 '18 at 10:18
  • I believe it's safe. I would be more hesitant in pushing new task instances than just updating states of existing task instances. I think changing state of downstream tasks poses a smaller risk in clashing with the scheduler as the scheduler shouldn't be processing those tasks yet (at least after it's created those task instances when it first processes the dag run). – Daniel Huang Jan 19 '18 at 00:58