I have three necessary base nodes (PythonOperators) that I need to have sequentially called. Let us consider a scenario where my system receives a shape in the form of a jpeg
file. I have three functions that I run over the file:
- The first function will change the color of the shape, then, on success:
- The shape gets passed on to the second function, which will rotate the shape 90deg, then, on success:
- The shape gets passed onto the third function, which will transform the shape in some predetermined way, then, on success, the program is done.
So, I have three things that need to happen in order: ChangeColor-> Rotate -> Transform. I wish to add a BranchingOperator to that will, on success, which is determined by a "SUCCESS" return value from each of these three nodes, move on to the next node on the list or terminate when the program ends. On failures, which is determined by a "FAILURE" return value from each node, the program progresses to a failure node (another PythonOperator, I imagine) that will do some things (save the error in some DB, extract some details and reasons, whatever - doesn't really matter).
But I am having some trouble figuring out how to do this in airflow. The docs haven't been.. too helpful when it comes to airflow's branching operator and I havent been able to find that many examples online. Can anyone help?
Thanks!
Edit: Okay, to make things a bit more concrete, here is what I currently have and think I need:
def branch_func_from_standardization(**context):
if context['mode'] == 'FAILURE':
return 'failure_node'
return 'rotate_shapes'
def branch_func_from_rotate(**context):
if context['mode'] == 'FAILURE':
return 'failure_node'
return 'transform_shapes'
def failure_node(**context):
print("FAILURE")
with dag:
color_task = PythonOperator(
task_id="color_shapes",
dag=dag,
python_callable=color_shape,
provide_context=True
)
rotate_task = PythonOperator(
task_id="rotate_shapes",
dag=dag,
python_callable=rotate_shape,
provide_context=True
)
transform_task = PythonOperator(
task_id="transform_shapes",
dag=dag,
python_callable=transform_shape,
provide_context=True
)
failure_task = PythonOperator(
task_id="failure_node",
dag=dag,
provide_context=True,
python_callable=failure_node
)
branching_task_color = BranchPythonOperator(
task_id='branch_task_color',
provide_context=True,
python_callable=branch_func_from_color
)
branching_task_rotate = BranchPythonOperator(
task_id='branch_task_rotate',
provide_context=True,
python_callable=branch_func_from_rotate
)
Don't worry about the callable functions... I don't 100% know if I'm using context
correctly in the callables I did provide, but nonetheless, I just want to focus on the graph. I want a graph that, in airflow, looks like this:
|---> END_PROGRAM
|---> transform_shapes ---> branch_task_transform -------
|---> rotate_shapes ---> branch_task_rotate--- |
color_shapes ---> branch_task_color --- | |
|----------------------------------------------------------------------------------------------------------> failure_node
Okay, I don't have a branch_task_transform
in the code, but you get the idea. How the hell do I do this in airflow? To reiterate, on any failures, I need to go to the failure_node
; on successes, I just continue forward. Failure is determined by a return value from the python callables.