2

My question is about a DAG that dynamically defines a group of parallel tasks based on counting the number of rows in a MySQL table that is deleted and reconstructed by the upstream tasks. The difficulty that I am having is that in my upstream tasks I TRUNCATE this table to clear it before rebuilding it again. This is the sherlock_join_and_export_task. When I do this the row count goes down to zero and my dynamically generated tasks cease to be defined. When the table is restored the graph's structure is as well, but the tasks no longer execute. Instead, they show up as black boxes in the tree view:

enter image description here

Here's the DAG looks like after sherlock_join_and_export_task deletes the table referenced in the line count = worker.count_online_table():

enter image description here

After sherlock_join_and_export_task completes this is what the DAG looks like:

enter image description here

None of the tasks are queued and executed, though. The DAG just keeps running and nothing happens.

Is this a case where I would use a sub-DAG? Any insights on how to set this up, or re-write the existing DAG? I'm running this on AWS ECS with a LocalExecutor. Code below for reference:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

BATCH_SIZE = 75000

from preprocessing.marketing.minimalist.table_builder import OnlineOfflinePreprocess

worker = OnlineOfflinePreprocess()

def partial_process_flow(batch_size, offset):
    worker = OnlineOfflinePreprocess()
    worker.import_offline_data()
    worker.import_online_data(batch_size, offset)
    worker.merge_aurum_to_sherlock()
    worker.upload_table('aurum_to_sherlock')

def batch_worker(batch_size, offset, DAG):
    return PythonOperator(
        task_id="{0}_{1}".format(offset, batch_size),
        python_callable=partial_process_flow,
        op_args=[batch_size, offset],
        dag=DAG)

DAG = DAG(
  dag_id='minimalist_data_preproc',
  start_date=datetime(2018, 1, 7, 2, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
  max_active_runs=1,
  concurrency=4,
  schedule_interval='0 9 * * *', #..4am hora mexico
  catchup=False
)

clear_table_task = PythonOperator(
    task_id='clear_table_task',
    python_callable=worker.clear_marketing_table,
    op_args=['aurum_to_sherlock'],
    dag=DAG
)

sherlock_join_and_export_task = PythonOperator(
    task_id='sherlock_join_and_export_task',
    python_callable=worker.join_online_and_send_to_galileo,
    dag=DAG
)

sherlock_join_and_export_task >> clear_table_task

count = worker.count_online_table()
if count == 0:
    sherlock_join_and_export_task >> batch_worker(-99, -99, DAG) #..dummy task for when left join deleted
else:
    format_table_task = PythonOperator(
        task_id='format_table_task',
        python_callable=worker.format_final_table,
        dag=DAG
    )

    build_attributions_task = PythonOperator(
        task_id='build_attributions_task',
        python_callable=worker.build_attribution_weightings,
        dag=DAG
    )

    update_attributions_task = PythonOperator(
        task_id='update_attributions_task',
        python_callable=worker.update_attributions,
        dag=DAG
    )

    first_task = batch_worker(BATCH_SIZE, 0, DAG)
    clear_table_task >> first_task
    for offset in range(BATCH_SIZE, count, BATCH_SIZE):
        first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> format_table_task

    format_table_task >> build_attributions_task >> update_attributions_task

Here's a simplified concept of what the DAG is doing:

...

def batch_worker(batch_size, offset, DAG):
    #..A function the dynamically generates tasks based on counting the reference table
    return dag_task

worker = ClassMethodsForDAG()
count = worker.method_that_counts_reference table()

if count == 0:
    delete_and_rebuild_reference_table_task >> batch_worker(-99, -99, DAG) 
else:
    first_task = batch_worker(BATCH_SIZE, 0, DAG)
    clear_table_task >> first_task
    for offset in range(BATCH_SIZE, count, BATCH_SIZE):
        first_task >> batch_worker(BATCH_SIZE, offset, DAG) >> downstream_task
aaron
  • 6,339
  • 12
  • 54
  • 80
  • Does the table you truncate happen to be one of the airflow metadata tables? Otherwise there is a lot going on in this script so maybe it will be easier to fix if you reduce to a minimum DAG that reproduces the error? – Alexander Tronchin-James Mar 15 '18 at 04:21
  • Hi @AlexanderTronchin-James, nope, I'm not playing around with the meta DB, but good thought. Here's what I think I will actually do: run two separate DAGs at different times in the day without explicitly declaring the task dependency. Not ideal, but it will be good enough for now. – aaron Mar 15 '18 at 15:35
  • @AlexanderTronchin-James Added a reduced (non-functional) example – aaron Mar 15 '18 at 16:10

2 Answers2

1

I fought this use case for a long time. In short, a dag that’s built based on the state of a changing resource, especially a db table, doesn’t fly so well in airflow.

My solution was to write a small custom operator that’s a subclass if truggerdagoperator, it does the query and then triggers dagruns for each of the subprocess.

It makes the process “join” downstream more interesting, but in my use case I was able to work around it with another dag that polls and short circuits if all the sub processes for a given day have completed. In other cases partition sensors can do the trick.

I have several use cases like this (iterative dag trigger based on a dynamic source), and after a lot of fighting with making dynamic Subdags work (a lot), I switched to this “trigger subprocess” strategy and have been doing well since.

Note - this may make a large number of dagruns for one targ (the target). This makes the UI challenging in some places, but it’s workable (and I’ve started querying the db directly because I’m not ready to write a plugin that does UI stuffs)

  • Thanks for this. Any links or code examples? What you did is still a bit abstract for me. For now hacking this by splitting the DAGS, eliminating the explicit dependency, and running them 12 hours apart. Not ideal, but I'm choosing to take on a bit of technical debt now in hopes of solving this later... – aaron Mar 15 '18 at 16:16
  • (and did you mean trigger_dag_operator?) – aaron Mar 15 '18 at 20:26
1

Looking over your dag I think you've implemented a non-idempotent process that airflow is not really configured for. Instead of truncating/updating the table that you're building, you should probably be leaving the tasks configured and updating only the start_date/end_date to enable and disable them for scheduling at the task level, or even run all of them every iteration and in your script check the table to just run a hello world if the job is disabled.

  • Makes sense, I think. To confirm that I'm understanding you, are you saying that I should _not_ have dynamically-generated tasks, and instead have a fixed group of tasks where I control whether they are fired or not based on a certain condition? The challenge there is that I do need the task load to scale with the DB.... Right now I just split the DAG into two different DAGs, each run 12 hrs a part – aaron Mar 17 '18 at 20:57
  • Dynamic generation is fine, but you need to make sure any generated tasks are _persistent_, otherwise it's unclear how to handle old tasks that are no longer defined. – Alexander Tronchin-James Mar 18 '18 at 22:22