1

The original code related to this question can be found here.

I'm confused by up both bitshift operators and set_upstream/set_downstream methods are working within a task loop that I've defined in my DAG. When the main execution loop of the DAG is configured as follows:

for uid in dash_workers.get_id_creds():
    clear_tables.set_downstream(id_worker(uid))

or

for uid in dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

The graph looks like this (the alpha-numeric sequence are the user IDs, which also define the task IDs):

enter image description here

when I configure the main execution loop of the DAG like this:

for uid in dash_workers.get_id_creds():
    clear_tables.set_upstream(id_worker(uid))

or

for uid in dash_workers.get_id_creds():
    id_worker(uid) >> clear_tables

the graph looks like this:

enter image description here

The second graph is what I want / what I would have expected the first two snippets of code to have produced based on my reading of the docs. If I want clear_tables to execute first before triggering my batch of data parsing tasks for different user IDs should I indicate this as clear_tables >> id_worker(uid)

EDIT -- Here's the complete code, which has been updated since I posted the last few questions, for reference:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  'start_date': datetime.now(),
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=id,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in dash_workers.get_id_creds():
    preproc_task = id_worker(uid)
    clear_tables << preproc_task

After implementing @LadislavIndra's suggestion I continue to have the same reversed implementation of the bitshift operator in order to get the correct dependency graph.

UPDATE @AshBerlin-Taylor's answer is what's going on here. I assumed that Graph View and Tree View were doing the same thing, but they're not. Here's what id_worker(uid) >> clear_tables looks like in graph view:

enter image description here

I certainly don't want the final step in my data pre-prep routine to be to delete all data tables!

aaron
  • 6,339
  • 12
  • 54
  • 80

2 Answers2

5

The tree view in Airflow is "backwards" to how you (and I!) first thought about it. In your first screenshot it is showing that "clear_tables" must be run before the "AAAG5608078M2" run task. And the DAG status depends upon each of the id worker tasks. So instead of a task order, it's a tree of the status chain. If that makes any sense at all.

(This might seem strange at first, but it's because a DAG can branch out and branch back in.)

You might have better luck looking at the Graph view for your dag. This one has arrows and shows the execution order in a more intuitive way. (Though I do now find the tree view useful. It's just less clear to start with)

Ash Berlin-Taylor
  • 3,879
  • 29
  • 34
  • @AshBerli-Taylor -- Nailed it! Posting an update with a screenshot from graph view. – aaron Jul 28 '17 at 17:40
  • 1
    This should be mentioned in [Common-Pitfalls](https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls). Even though a `DAG` can't be directly represented as a `tree` (something that many don't realize, [see this](https://stackoverflow.com/a/48410929/3679900)), its almost *counter-intuitive* to understand (out-of the box) that `Airflow`'s **Tree-View is drawn right to left** (that's just a rough statement for the beginners, not entirely correct) – y2k-shubham Jul 06 '18 at 05:44
1

Looking through your other code, it seems get_id_creds is your task and you're trying to loop through it, which is creating some weird interaction.

A pattern that will work is:

clear_tables = MyOperator()

for uid in uid_list:
  my_task = MyOperator(task_id=uid)
  clear_tables >> my_task
Ladislav Indra
  • 819
  • 6
  • 10
  • Thanks @LadislavIndra, but that still doesn't work. I'm going to update the question with complete code. – aaron Jul 28 '17 at 15:42