1

I have list that I loop to create the tasks. The list are static as far as size.

        for counter, account_id in enumerate(ACCOUNT_LIST):
            task_id = f"bash_task_{counter}"
            if account_id:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
            else:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
                trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
            trigger_task

I tried this manually but cannot make the task skipped:

        start = DummyOperator(task_id='start')
        task1 = DummyOperator(task_id='task_1')
        task2 = DummyOperator(task_id='task_2')
        task3 = DummyOperator(task_id='task_3')
        task4 = DummyOperator(task_id='task_4')

        start >> task1
        start >> task2

        try:
            start >> task3
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task3')
            
        try:
            start >> task4
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task4')
Oli
  • 9,766
  • 5
  • 25
  • 46
alltej
  • 6,787
  • 10
  • 46
  • 87
  • **@alltej** you are confusing it; `AirflowSkipException` has to be raised from within your operator's code (and not in your `DAG` definition code as you are doing here). What you are trying to do here is not clear; but also impossible (you can't mark state of a task during DAG-definition, since it hasn't run yet). To skip tasks in pre-defined fashion, you can **[1]** *(much easier)* create & either wire them together conditionally **[2]** or use `BranchPythonOperator` / `ShortCircuitOperator` – y2k-shubham Jul 17 '20 at 21:52
  • Do you have code example? – alltej Jul 18 '20 at 02:57
  • I am trying to avoid the BranchOperator coz it looks too overkill – alltej Jul 18 '20 at 04:05

2 Answers2

10

yes there you need to raise AirflowSkipException

from airflow.exceptions import AirflowSkipException

raise AirflowSkipException

For more information see the source code

Mike Taylor
  • 689
  • 3
  • 8
  • this did not work. see updated question with the manually added tasks – alltej Jul 17 '20 at 20:17
  • 1
    Hi alltej... the code that you are using won't get executed by Airflow as you are putting it at the DAG level. The `raise AirflowSkipException` needs to be inlined or part of the executed code during an executed `task_instance`. As such it should be within the Operator's execute method of the Task that you intend to skip. – Mike Taylor Jul 19 '20 at 16:15
0

Have a fixed number of tasks to execute per DAG. This is really fine and this is also planning how much max parallel task your system should handle without degrading downstream systems. Also, having fixed number of tasks makes it visible in the web UI and give you indication whether they are executed or skipped.

In the code below, I initialized the list with None items and then update the list with values based on returned data from the DB. In the python_callable function, check if the account_id is None then raise an AirflowSkipException, otherwise execute the function. In the UI, the tasks are visible and indicates whether executed or skipped(meaning there is no account_id)

    def execute(account_id):
        if account_id:
            print(f'************Executing task for account_id:{account_id}')
        else:
            raise AirflowSkipException

    def create_task(task_id, account_id):
        return PythonOperator(task_id=task_id,
                              python_callable=execute,
                              op_args=[account_id])


    list_from_dbhook = [1, 2, 3] # dummy list. Get records using DB Hook

    # Need to have some fix size. Need to allocate fix resources or # of tasks.
    # Having this fixed number of tasks will make this tasks to be visible in UI instead of being purely dynamic
    record_size_limit = 5 
    
    ACCOUNT_LIST = [None] * record_size_limit
    for index, account_id_val in enumerate(list_from_dbhook):
        ACCOUNT_LIST[index] = account_id_val

    for idx, acct_id in enumerate(ACCOUNT_LIST):
        task = create_task(f"task_{idx}", acct_id)
        task

enter image description here

alltej
  • 6,787
  • 10
  • 46
  • 87