The original code related to this question can be found here.
I'm confused by up both bitshift operators and set_upstream
/set_downstream
methods are working within a task loop that I've defined in my DAG. When the main execution loop of the DAG is configured as follows:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
or
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
The graph looks like this (the alpha-numeric sequence are the user IDs, which also define the task IDs):
when I configure the main execution loop of the DAG like this:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
or
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
the graph looks like this:
The second graph is what I want / what I would have expected the first two snippets of code to have produced based on my reading of the docs. If I want clear_tables
to execute first before triggering my batch of data parsing tasks for different user IDs should I indicate this as clear_tables >> id_worker(uid)
EDIT -- Here's the complete code, which has been updated since I posted the last few questions, for reference:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
After implementing @LadislavIndra's suggestion I continue to have the same reversed implementation of the bitshift operator in order to get the correct dependency graph.
UPDATE @AshBerlin-Taylor's answer is what's going on here. I assumed that Graph View and Tree View were doing the same thing, but they're not. Here's what id_worker(uid) >> clear_tables
looks like in graph view:
I certainly don't want the final step in my data pre-prep routine to be to delete all data tables!