0

This however is not working properly and I should admit that it is my first time working with python. Any help would be very useful. I have put together a test DAG to do the following but it does not work :

  • run task t1 and return a value
  • run task t2 if value from 1 is ALL_SUCCESS
    from datetime import datetime
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator

    def set_trigger(taskid, **kwargs):
        xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
        print( xcomValue, " <------- LOOK HERE XCOM VAR")
       if(xcomValue == "0"):
          return TriggerRule.ALL_SUCCESS
       return TriggerRule.ALL_FAILED 

    dag = DAG(dag_id="example_bash_operator", schedule_interval=None, start_date=datetime(2018, 12, 31) ) as dag:

    t1 = BashOperator(
        task_id="t1",
        bash_command='do something && echo 0 ',
        dag=dag
    )

    t2 = BashOperator(
          task_id="t2",
          bash_command='do something else here ',
         trigger_rule=set_trigger,
         dag=dag,
    )

    t1 >> t2
    ```
OzmaTis
  • 333
  • 1
  • 5
  • 14
  • 1
    read comments (and the links given in them) in [this](https://stackoverflow.com/q/61719690/3679900) question to understand why what you are doing is wrong / NOT possible. TL;DR `trigger_rule` input is required at **dag-parsing time** but templates like `{{ task_instance.xcom_pull(task_ids=taskid) }}` are materialized not when the DAG is parsed / generated, but when it is run (DAG -> DagRun, Task -> TaskInstance) – y2k-shubham Sep 16 '20 at 19:11
  • @y2k-shubham What i really want is to ensure that t2 is skipped if t1 returns a specific value... is this kind of custom dependency possible with airflow? – OzmaTis Sep 16 '20 at 19:25
  • @y2k-shubham Thanks, I think I understand why it would not be possible to use trigger ..... is another way I can achieve this kind of custom dependency with airflow? – OzmaTis Sep 16 '20 at 19:35
  • OK so essentially you need to retrieve `state` of a task in another task; check [Status of Airflow task within the dag](https://stackoverflow.com/q/43732642/3679900) and [How to find the number of upstream tasks failed in Airflow?](https://stackoverflow.com/q/50613155/3679900) – y2k-shubham Sep 16 '20 at 20:24

1 Answers1

0

Why not use BranchPythonOperator (docs):

This way you only run t2 if the value returned by t1 is 0

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

def set_trigger(taskid, **kwargs):
    xcomValue = {{ task_instance.xcom_pull(task_ids=taskid) }}
    print( xcomValue, " <------- LOOK HERE XCOM VAR")
   if(xcomValue == "0"):
      return TriggerRule.ALL_SUCCESS
   return TriggerRule.ALL_FAILED 

dag = DAG(dag_id="example_bash_operator", schedule_interval=None, start_date=datetime(2018, 12, 31) ) as dag:

t1 = BashOperator(
    task_id="t1",
    bash_command='do something && echo 0 ',
    dag=dag
)

def branch_func(**kwargs):
    ti = kwargs['ti']
    xcom_value = int(ti.xcom_pull(task_ids='t1'))
    if xcom_value == '0':
        return 't2'

check_t1 = BranchPythonOperator(
    task_id='check_t1',
    provide_context=True,
    python_callable=branch_func,
    dag=dag)

t2 = BashOperator(
      task_id="t2",
      bash_command='do something else here ',
     trigger_rule=set_trigger,
     dag=dag,
)

t1 >> t2
kaxil
  • 17,706
  • 2
  • 59
  • 78