0

Airflow graph task branch never runs, complains “Task instance did not exist in the DB”, but can see in graph.

I have an airflow graph with a conditional branch defined like

class BranchFlags(Enum):
    yes = "yes"
    no = "no"
...
for table in list_of_tables # type list(dict)
    task_1 = BashOperator(
        task_id='task_1_%s' % table["conf1"],
        bash_command='bash script1.sh %s' % table["conf1"],
        dag=dag)

    if table["branch_flag"] == BranchFlags.yes:
        consolidate = BashOperator(
            task_id='task_3_%s' % table["conf2"],
            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
            dag=dag)

    task_3 = BashOperator(
        task_id='task_3_%s' % table["conf3"],
        bash_command='bash script3.sh %s' % table["conf3"],
        dag=dag)

    task_1 >> task_3
    if table["branch_flag"] == BranchFlags.yes:
        task_1 >> task_2

and here is the graph in the airflow UI from my actual code: enter image description here

Notice that even though the longer parts of the graph run fine, the lone branch is not being run for the one sequence that was supposed to branch. When viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since it does appear in the web UI graph. Not sure what is going on here and adding other changes to the dag .py file do show up in the graph and are executed by the scheduler when running the graph. And attempting to view the tasks Task Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the problem.

Note that the intention is that the short branch runs concurrently with the longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

lampShadesDrifter
  • 3,925
  • 8
  • 40
  • 102
  • New to airflow, but wondering if the shorter branch needs to connect back to the longer path downstream. Leaving as a debugging note here to try and report back. – lampShadesDrifter Oct 24 '19 at 02:10
  • Not totally sure why, but seems to be related to the fact that the affected branch tasks is named using a dynamic value, will continue debugging. – lampShadesDrifter Oct 25 '19 at 01:12
  • Doing two things seemed to work: 1) not naming the task_id after a value that is evaluate dynamically before the dag is created (really weird) and 2) connecting the short leg back to the longer one downstream. Only after doing both do both the "prep_file..." and "consolidate" branches both run (referring to the image in the post). Still trying to figure out why before writing full answer. If anyone knows what could be going on, please let me know. – lampShadesDrifter Oct 25 '19 at 01:50

0 Answers0