My question is about a DAG that dynamically defines a group of parallel tasks based on counting the number of rows in a MySQL table that is deleted and reconstructed by the upstream tasks. The difficulty that I am having is that in my upstream tasks I TRUNCATE
this table to clear it before rebuilding it again. This is the sherlock_join_and_export_task
. When I do this the row count goes down to zero and my dynamically generated tasks cease to be defined. When the table is restored the graph's structure is as well, but the tasks no longer execute. Instead, they show up as black boxes in the tree view:
Here's the DAG looks like after sherlock_join_and_export_task
deletes the table referenced in the line count = worker.count_online_table()
:
After sherlock_join_and_export_task
completes this is what the DAG looks like:
None of the tasks are queued and executed, though. The DAG just keeps running and nothing happens.
Is this a case where I would use a sub-DAG? Any insights on how to set this up, or re-write the existing DAG? I'm running this on AWS ECS with a LocalExecutor
. Code below for reference:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
BATCH_SIZE = 75000
from preprocessing.marketing.minimalist.table_builder import OnlineOfflinePreprocess
worker = OnlineOfflinePreprocess()
def partial_process_flow(batch_size, offset):
worker = OnlineOfflinePreprocess()
worker.import_offline_data()
worker.import_online_data(batch_size, offset)
worker.merge_aurum_to_sherlock()
worker.upload_table('aurum_to_sherlock')
def batch_worker(batch_size, offset, DAG):
return PythonOperator(
task_id="{0}_{1}".format(offset, batch_size),
python_callable=partial_process_flow,
op_args=[batch_size, offset],
dag=DAG)
DAG = DAG(
dag_id='minimalist_data_preproc',
start_date=datetime(2018, 1, 7, 2, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='0 9 * * *', #..4am hora mexico
catchup=False
)
clear_table_task = PythonOperator(
task_id='clear_table_task',
python_callable=worker.clear_marketing_table,
op_args=['aurum_to_sherlock'],
dag=DAG
)
sherlock_join_and_export_task = PythonOperator(
task_id='sherlock_join_and_export_task',
python_callable=worker.join_online_and_send_to_galileo,
dag=DAG
)
sherlock_join_and_export_task >> clear_table_task
count = worker.count_online_table()
if count == 0:
sherlock_join_and_export_task >> batch_worker(-99, -99, DAG) #..dummy task for when left join deleted
else:
format_table_task = PythonOperator(
task_id='format_table_task',
python_callable=worker.format_final_table,
dag=DAG
)
build_attributions_task = PythonOperator(
task_id='build_attributions_task',
python_callable=worker.build_attribution_weightings,
dag=DAG
)
update_attributions_task = PythonOperator(
task_id='update_attributions_task',
python_callable=worker.update_attributions,
dag=DAG
)
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> format_table_task
format_table_task >> build_attributions_task >> update_attributions_task
Here's a simplified concept of what the DAG is doing:
...
def batch_worker(batch_size, offset, DAG):
#..A function the dynamically generates tasks based on counting the reference table
return dag_task
worker = ClassMethodsForDAG()
count = worker.method_that_counts_reference table()
if count == 0:
delete_and_rebuild_reference_table_task >> batch_worker(-99, -99, DAG)
else:
first_task = batch_worker(BATCH_SIZE, 0, DAG)
clear_table_task >> first_task
for offset in range(BATCH_SIZE, count, BATCH_SIZE):
first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> downstream_task