Airflow graph task branch never runs, complains “Task instance did not exist in the DB”, but can see in graph.
I have an airflow graph with a conditional branch defined like
class BranchFlags(Enum):
yes = "yes"
no = "no"
...
for table in list_of_tables # type list(dict)
task_1 = BashOperator(
task_id='task_1_%s' % table["conf1"],
bash_command='bash script1.sh %s' % table["conf1"],
dag=dag)
if table["branch_flag"] == BranchFlags.yes:
consolidate = BashOperator(
task_id='task_3_%s' % table["conf2"],
bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
dag=dag)
task_3 = BashOperator(
task_id='task_3_%s' % table["conf3"],
bash_command='bash script3.sh %s' % table["conf3"],
dag=dag)
task_1 >> task_3
if table["branch_flag"] == BranchFlags.yes:
task_1 >> task_2
and here is the graph in the airflow UI from my actual code:
Notice that even though the longer parts of the graph run fine, the lone branch is not being run for the one sequence that was supposed to branch. When viewing the logs for the task, I see
*** Task instance did not exist in the DB
This is weird to me, since ostensibly the scheduler DB sees the task since it does appear in the web UI graph. Not sure what is going on here and adding other changes to the dag .py
file do show up in the graph and are executed by the scheduler when running the graph. And attempting to view the tasks Task Instance Details throws error
Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
Running airflow resetdb
(as I've seen in other posts) does nothing for the problem.
Note that the intention is that the short branch runs concurrently with the longer branch (not as an either or choice).
Anyone know why this would be happening or have some debugging tips?