Let me add my take on this.
First of all, sorry for the lengthy post, but I wanted to share the complete solution that works for me.
background
We have a script that pulls data from a very crappy and slow API.
It's slow so we need to be selective about what we do and what we don't pull from it (1 request/s with more than 750k requests to make)
Occasionally the requirements change that forces us to pull the data in full but only for one/some endpoints. So we need something we can control.
The strict rate limit of 1 request/s with several seconds of delay if breached would halt all parallel tasks.
The meaning of the 'catchup': True
is essentially a backfill that is translated into a command line option (-c
).
There are no data dependencies between our tasks, we only need to follow the order of (some) tasks.
solution
Introducing the pre_execute callable with the extra DAG config takes care of the proper skip of tasks which throws the AirflowSkipException
.
Secondly, based on the config we can swap the original operator for a simple Python operator with the same name with a simple definition.
This way the UI won't be confused and the trigger history will be kept complete - showing the executions when a task was skipped.
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from plugins.airflow_utils import default_args, kubernetes_pod_task
# callable for pre_execute arg
def skip_if_specified(context):
task_id = context['task'].task_id
conf = context['dag_run'].conf or {}
skip_tasks = conf.get('skip_task', [])
if task_id in skip_tasks:
raise AirflowSkipException()
# these are necessary to make this solution work
support_task_skip_args = {'pre_execute': skip_if_specified,
'trigger_rule': 'all_done'}
extended_args = {**default_args, **support_task_skip_args}
dag_name = 'optional_task_skip'
dag = DAG(dag_name,
max_active_runs=3,
schedule_interval=None,
catchup=False,
default_args=extended_args)
# select endpoints and modes
# !! make sure the dict items are in the same order as the order you want them to run !!
task_options = {
'option_name_1':
{'param': 'fetch-users', 'enabled': True, 'catchup': False},
'option_name_2':
{'param': 'fetch-jobs', 'enabled': True},
'option_name_3':
{'param': 'fetch-schedules', 'enabled': True, 'catchup': True},
'option_name_4':
{'param': 'fetch-messages', 'enabled': True, 'catchup': False},
'option_name_5':
{'param': 'fetch-holidays', 'enabled': True, 'catchup': False},
}
def add_tasks():
task_list_ = []
for task_name_, task_config_ in task_options.items():
if task_config_['enabled']:
parameter_ = task_config_['param']
catchup_ = '-c ' if task_config_.get('catchup') else ''
task_list_.append(
kubernetes_pod_task(
dag=dag,
command=f"cd people_data; python3 get_people_data.py {parameter_} {catchup_}",
task_id=f"{task_name_}"))
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
else:
# the callable that throws the skip signal
def skip_task(): raise AirflowSkipException()
task_list_.append(
PythonOperator(dag=dag,
python_callable=skip_task,
task_id=f"{task_name_}",
)
)
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
# populate the DAG
add_tasks()
Note:
The default_args, kubernetes_pod_task
are just wrappers for convenience.
The kubernetes pod task injects some variables and secrets in a simple function and uses the from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
module, I won't and can't share those with you.
The solution extends the great ideas of this gentleman:
https://www.youtube.com/watch?v=abLGyapcbw0
Although, this solution works with Kubernetes operators, too.
Of course, this could be improved, and you absolutely can extend or rework the code to parse manual trigger config as well (as it is shown in the video).
Here's what it looks like in my UI :

(it doesn't reflect the example config above but rather the actual runs in our staging infrastructure)