I've run into this as well and haven't really found a clean way to address it. If you know all the different possible parameters you would pass to each subdag...then what you can do is hardcode that into the DAG file and just always create the DAG with every possible subdag. Then you have an operator (similar your "get every n") which fetches the list of subdags you want to run and have it mark any downstream subdag not in the list as skipped
. Something like this:
SUBDAGS = {
'a': {'id': 'foo'},
'b': {'id': 'bar'},
'c': {'id': 'test'},
'd': {'id': 'hi'},
}
def _select_subdags(**context):
names = fetch_list() # returns ["a", "c", "d"]
tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]
session = Session()
tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.execution_date == context['ti'].execution_date,
TaskInstance.task_id.in_(tasks_to_skip),
)
for ti in tis:
now = datetime.utcnow()
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
select_subdags = PythonOperator(
task_id='select_subdags',
dag=dag,
provide_context=True,
python_callable=_select_subdags,
)
for name, params in SUBDAGS.iteritems():
child_dag_id = 'my_subdag_' + name
subdag_op = SubDagOperator(
task_id=child_dag_id,
dag=dag,
subdag=my_subdag(dag.dag_id, child_dag_id, params),
)
select_subdags >> subdag_op
Obviously not ideal, especially when you end up wanting to just run one subdag out of hundreds. We've also run into some performance issues with thousands of subdags in a single DAG, as it can lead to tons of task instances, majority of which are simply skipped.