6

How to check failed task like task 1 is failed then run task 2, like if else condition.

I want to run the dependent task.

Task1 failed then how can i get that error log in a condition like if task1== failed then run task2 and else task3. I tried SSHHOOK but I am looking for a simple solution.

  with DAG(
    'airflow',
    catchup=False,
    default_args={
        'owner': 'abc',
    'start_date': datetime(2018, 4, 17),
        'schedule_interval':None,
        'depends_on_past': False,
    },   
) as dag:
    task_1 = PythonOperator(
        task_id='task_1', 
        python_callable=do(),
    )
    task_2 = PythonOperator(
        task_id='task_2',
        python_callable=do(),
    )
    task_3 = PythonOperator(
        task_id='task_3',
        python_callable=do()

    task_3.set_upstream(task_2)
    task_2.set_upstream(task_1)
griez007
  • 439
  • 1
  • 5
  • 11
  • This can also be achieved using [`trigger_rule`s](https://airflow.apache.org/docs/stable/concepts.html#trigger-rules) – y2k-shubham Sep 16 '20 at 20:21

2 Answers2

9

Since there were no code examples I have to assume what your DAG might look like and what you want to do. Also, I didn't understand why you wanted to use SSHHook but again, no code examples. So here we go:

Create error task

def t2_error_task(context):

    instance = context['task_instance']
    do_stuff()

Create tasks

t1_task = PythonOperator(
    task_id='my_operator_t1',
    python_callable=do_python_stuff,
    on_failure_callback=t2_error_task,
    dag=dag
)

t3_task_success = PythonOperator(
    task_id='my_operator_t3',
    python_callable=do_python_stuff_success,
    dag=dag
)

Then set t3 upstream of t1:

t1_task >> t3_task_success 
tobi6
  • 8,033
  • 6
  • 26
  • 41
  • Thanks, @tobi6 I was trying this example ** https://stackoverflow.com/questions/43678408/how-to-create-a-conditional-task-in-airflow?rq=1 and I am getting connection issue and SSHhook operator because of updated airflow to 1.9 and no solution for error and change is "use SSHOperator class in place of SSHExecuteOperator which is removed now" so – griez007 Apr 17 '18 at 11:25
  • The comment is hard to understand. Please edit your question and add more information about your DAG and maybe a stack trace if you have an error. – tobi6 Apr 17 '18 at 11:38
  • using on_failure_callback can i run next task? like in your example code you have called t2_error_task, so instead of that can i call another task? I have updated the question with code. – griez007 Apr 17 '18 at 11:43
  • Sure, you can run another task. Feel free to build a test DAG and ask a new question if it doesn't work. Also, I couldn't see any SSHHooks in your sample code, you might want to ask another question with code and logs for that. – tobi6 Apr 17 '18 at 12:21
  • Can we pass parameters to the error task? When I try I just get missing parameter context and I think it's because I'm not adding the context to `on_failure_callback=t2_error_task` but idk what to put there? I'm trying to do this `on_failure_callback=t2_error_task(param1, param2)` – Kyle Bridenstine Aug 16 '18 at 22:18
  • Not possible afaik, but you should be able to read from context and as such also from xcom. – tobi6 Aug 17 '18 at 06:05
  • Can do_stuff() call another task ? This example is missing the failure chain. – viru Aug 05 '21 at 17:39
2

One solution that would be explicit in your DAG topology is to mkake task_1 write a XCOM to mark it's success or failure, then create a BranchPythonOperator that reads that XCOM and decides based on it if you should execute task_2 or not.

villasv
  • 6,304
  • 2
  • 44
  • 78
  • 2
    @PratikNariya See https://stackoverflow.com/questions/50149085/python-airflow-return-result-from-pythonoperator/50149522#50149522 – tobi6 Aug 17 '18 at 07:14