4

I have a simple python operator, defined like so:

loop_records = PythonOperator(
    task_id = 'loop_records',
    provide_context = True,
    python_callable = loop_topic_records,
    dag = dag    
)

This python operator calls loop_topic_records, defined like so:

def loop_topic_records(**context):
    parent_dag = context['dag']
    for i in range(3):
        op = DummyOperator(
            task_id="child_" + str(i),
            dag=parent_dag
        )
        logging.info('Child operator ' + str(i))
        loop_records >> op

I see that the code does not raise any errors. It even prints Child operator 0..2 in the log. However, in the dag Graph view I do not see child operators, I just see only loop_records node, as if my dag consists only of one operator. So, what is wrong with that? And how can I fix it?

Jacobian
  • 10,122
  • 29
  • 128
  • 221
  • I've just created on operatorm which must fail (I just put such a logic into this operator). However, when I run the whole dag, it runs successfully. So, it means that nested child operators called in this way never run – Jacobian Mar 07 '20 at 18:55

2 Answers2

6

You can't do what you want. Each DAG, once loaded by Airflow, is static, and can't be altered from a running task. Any alterations you make to the DAG from inside a task are ignored.

What you can do, is start other DAGs, using the Multi DAG run operator provided by the airflow_multi_dagrun plugin; create a DAG of DAGs, so to speak:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator

def gen_topic_records(**context):
    for i in range(3):
        # generate `DagRunOrder` objects to pass a payload (configuration)
        # to the new DAG runs.
        yield DagRunOrder(payload={"child_id": i})
        logging.info('Triggering topic_record_dag #%d', i)

loop_topic_record_dags = TriggerMultiDagRunOperator(
    task_id='loop_topic_record_dags',
    dag=dag,
    trigger_dag_id='topic_record_dag',
    python_callable=gen_topic_records,
)

The above will trigger a DAG named topic_record_dag to be started, 3 times. Inside operators in that DAG, you can access whatever was set as the payload via the dag_run.conf object (in templates) or the context['dag_run'].conf reference (in PythonOperator() code, with provide_context=True set).

If you need to do additional work once those 3 DAGs are done, all you need is add a sensor to the above DAG. Sensors are operators that wait until a specific outside piece of information is available. Use one here that is triggered when all the child DAGs are done. The same plugin has a MultiDagRunSensor that's exactly what you'd need here, it'll trigger when all DAGs started by a TriggerMultiDagRunOperator task are finished (succeeded or failed):

from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor

wait_for_topic_record_dags = MultiDagRunSensor(
    task_id='wait_for_topic_record_dags',
    dag=dag
)

loop_topic_record_dags >> wait_for_topic_record_dags

then put further operators after that sensor.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
1

I'm not sure what your use case is for creating child operators from within the python_callable function, but if that's not a strict requirement, you can just loop over the creation of the PythonOperator like so

op = DummyOperator(
  task_id="dummy",
  dag=dag
)

for i in range(3):
    loop_records = PythonOperator(
        task_id=f'loop_records_{i}',
        provide_context=True,
        python_callable=loop_topic_records,
        dag=dag
    )
    loop_records >> op
bricca
  • 856
  • 10
  • 6
  • Well, the thing is, I need to do it inside `python_callable`, because in real case number of loops depends on `xcom`. In `loop_topic_records` from my question I loop a static array, however, in reality it is dynamic and is based on `xcom` – Jacobian Mar 09 '20 at 17:57
  • Basically, Airflow can't do it, tasks are static and generates automatically before execution. But you can find w/a like this one https://stackoverflow.com/questions/39133376/airflow-dynamic-dag-and-task-ids – Ilya Bystrov Mar 11 '20 at 14:26