2

I have three necessary base nodes (PythonOperators) that I need to have sequentially called. Let us consider a scenario where my system receives a shape in the form of a jpeg file. I have three functions that I run over the file:

  1. The first function will change the color of the shape, then, on success:
  2. The shape gets passed on to the second function, which will rotate the shape 90deg, then, on success:
  3. The shape gets passed onto the third function, which will transform the shape in some predetermined way, then, on success, the program is done.

So, I have three things that need to happen in order: ChangeColor-> Rotate -> Transform. I wish to add a BranchingOperator to that will, on success, which is determined by a "SUCCESS" return value from each of these three nodes, move on to the next node on the list or terminate when the program ends. On failures, which is determined by a "FAILURE" return value from each node, the program progresses to a failure node (another PythonOperator, I imagine) that will do some things (save the error in some DB, extract some details and reasons, whatever - doesn't really matter).

But I am having some trouble figuring out how to do this in airflow. The docs haven't been.. too helpful when it comes to airflow's branching operator and I havent been able to find that many examples online. Can anyone help?

Thanks!

Edit: Okay, to make things a bit more concrete, here is what I currently have and think I need:

def branch_func_from_standardization(**context):
    if context['mode'] == 'FAILURE':
        return 'failure_node'
    return 'rotate_shapes'


def branch_func_from_rotate(**context):
    if context['mode'] == 'FAILURE':
        return 'failure_node'
    return 'transform_shapes'


def failure_node(**context):
    print("FAILURE")


with dag:
    color_task = PythonOperator(
        task_id="color_shapes",
        dag=dag,
        python_callable=color_shape,
        provide_context=True
    )

    rotate_task = PythonOperator(
        task_id="rotate_shapes",
        dag=dag,
        python_callable=rotate_shape,
        provide_context=True
    )

    transform_task = PythonOperator(
        task_id="transform_shapes",
        dag=dag,
        python_callable=transform_shape,
        provide_context=True
    )

    failure_task = PythonOperator(
        task_id="failure_node",
        dag=dag,
        provide_context=True,
        python_callable=failure_node
    )

    branching_task_color = BranchPythonOperator(
        task_id='branch_task_color',
        provide_context=True,
        python_callable=branch_func_from_color
    )

    branching_task_rotate = BranchPythonOperator(
        task_id='branch_task_rotate',
        provide_context=True,
        python_callable=branch_func_from_rotate
    )

Don't worry about the callable functions... I don't 100% know if I'm using context correctly in the callables I did provide, but nonetheless, I just want to focus on the graph. I want a graph that, in airflow, looks like this:

                                                                                                                                              |---> END_PROGRAM
                                                                                     |---> transform_shapes ---> branch_task_transform ------- 
                                       |---> rotate_shapes ---> branch_task_rotate---                                                         |
color_shapes ---> branch_task_color ---                                              |                                                        |
                                       |----------------------------------------------------------------------------------------------------------> failure_node

Okay, I don't have a branch_task_transform in the code, but you get the idea. How the hell do I do this in airflow? To reiterate, on any failures, I need to go to the failure_node; on successes, I just continue forward. Failure is determined by a return value from the python callables.

John Lexus
  • 3,576
  • 3
  • 15
  • 33
  • 1
    each Airflow `task` should be like a small script (running for a few minutes) and not something that takes seconds to run. In this example, individual image processing tasks might take only 1-2 seconds each (on ordinary hardware), but the scheduling latency b/w successive tasks would easily add upto ~ 20-30 seconds per image processed (even more if you do it on a CeleryExecutor Airflow deployment having many dags). I would strongly suggest to combine these operations into single task. Airflow is replacement of crons and not something like Uber's Cadence (realtime txns / streaming pipelines) – y2k-shubham Sep 18 '20 at 12:13
  • That’s an excellent observation, I’ll share that with the team. In any case, do you have any suggestions in how to do what I was asking? – John Lexus Sep 18 '20 at 18:06
  • 1
    Did you look into [XCOM](https://airflow.apache.org/docs/1.10.1/concepts.html?highlight=xcom)? XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size. – Philipp Johannis Sep 18 '20 at 20:55
  • Thank you so much for you comment Philipp. I will look into it. Anyone else taking a look, would really appreciate an answer :) thanks y'all! – John Lexus Sep 18 '20 at 20:58
  • These comments have been elaborated further in [this answer](https://stackoverflow.com/a/64028011/3679900) – y2k-shubham Sep 23 '20 at 13:21
  • @JohnLexus Do you still need help with this? I don't think you need branches here at all. You just need to fail the scripts when you want to fail the DAG (the failure_node can use `TriggerRule.ONE_FAILED` trigger rule) – Elad Kalif May 02 '21 at 09:51

0 Answers0