Hi Guys am new to airflow and python. I need to run the tasks based on the value of a variable in the input json. If the value of the variable 'insurance' is "true" then task1, task2, task3 need to run else task4, task5, task6 need to run. Since am a newbie to this i dont have much idea about the usage of PythonOperator & BranchPythonOperator.
This is my input json:
{
"car": {
"engine_no": "123_st_456",
"json": "{\"make\":\"Honda\",\"model\": Jazz, \"insurance\":\"true\",\"pollution\":\"true\" }"
}
}
The code is given below:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
import logging
import json
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))
PythonOperator(
task_id = 'sample_task',
python_callable = 'sample_fun',
op_kwargs = {
json : '{{ dag_run.car.json}}'
},
provide_context=True,
dag = dag
)
def sample_fun( json,**kwargs):
insurance_flag = json.dumps(json)['insurance']
task1 = BashOperator(
task_id='task1',
bash_command='echo 1'
)
task2 = BashOperator(
task_id='task2',
bash_command='echo 2'
)
task3 = BashOperator(
task_id='task3',
bash_command='echo 3'
)
task4 = BashOperator(
task_id='task4',
bash_command='echo 4'
)
task5 = BashOperator(
task_id='task5',
bash_command='echo 5'
)
task6 = BashOperator(
task_id='task6',
bash_command='echo 6'
)
if insurance_flag == "true":
task1.dag = dag
task2.dag = dag
task3.dag = dag
task1 >> task2 >> task3
else:
task4.dag = dag
task5.dag = dag
task6.dag = dag
task4 >> task5 >> task6