7

The current problem that I am facing is that I have documents in a MongoDB collection which each need to be processed and updated by tasks which need to run in an acyclic dependency graph. If a task upstream fails to process a document, then none of the dependent tasks may process that document, as that document has not been updated with the prerequisite information.

If I were to use Airflow, this leaves me with two solutions:

  1. Trigger a DAG for each document, and pass in the document ID with --conf. The problem with this is that this is not the intended way for Airflow to be used; I would never be running a scheduled process, and based on how documents appear in the collection, I would be making 1440 Dagruns per day.

  2. Run a DAG every period for processing all documents created in the collection for that period. This follows how Airflow is expected to work, but the problem is that if a task fails to process a single document, none of the dependent tasks may process any of the other documents. Also, if a document takes longer than other documents do to be processed by a task, those other documents are waiting on that single document to continue down the DAG.

Is there a better method than Airflow? Or is there a better way to handle this in Airflow than the two methods I currently see?

louis_guitton
  • 5,105
  • 1
  • 31
  • 33
Sebastian Mendez
  • 2,859
  • 14
  • 25
  • 3
    This perfectly outlines my problem understanding how to do something in Airflow for each element in a list (file in a directory, row in a database). Both solutions feel wrong for the reasons you outline. The answers here don't address these problems. What did you do in the end? – LondonRob Jan 17 '20 at 15:41
  • I didn't; the system still uses a poorly optimized COLSCAN to find a valid document to process. Thanks for putting a bounty on this question, hopefully that will attract more novel solutions. One thing I did find was Prefect which was designed by someone originally involved with Airflow that was upset by the limitations; it might be a better option for tasks such as this. – Sebastian Mendez Jan 17 '20 at 19:25
  • 2
    The [Prefect docs](https://docs.prefect.io/core/welcome/why_not_airflow.html#parametrized-workflows) perfectly describe this problem and claim to solve it. I haven't tried it so I can't advocate but it sounds good from their description. – LondonRob Jan 20 '20 at 11:12

5 Answers5

4

From the knowledge I gained in my attempt to answer this question, I've come to the conclusion that Airflow is just not the tool for the job.

Airflow is designed for scheduled, idempotent DAGs. A DagRun must also have a unique execution_date; this means running the same DAG at the exact same start time (in the case that we receive two documents at the same time is quite literally impossible. Of course, we can schedule the next DagRun immediately in succession, but this limitation should demonstrate that any attempt to use Airflow in this fashion will always be, to an extent, a hack.

The most viable solution I've found is to instead use Prefect, which was developed with the intention of overcoming some of the limitations of Airflow:

"Prefect assumes that flows can be run at any time, for any reason."

Prefect's equivalent of a DAG is a Flow; one key advantage of a flow that we may take advantage of is the ease of parametriziation. Then, with some threads, we're able to have a Flow run for each element in a stream. Here is an example streaming ETL pipeline:

import time
from prefect import task, Flow, Parameter
from threading import Thread
​
​
def stream():
    for x in range(10):
        yield x
        time.sleep(1)
​
​
@task
def extract(x):
    # If 'x' referenced a document, in this step we could load that document
    return x
​
​
@task
def transform(x):
    return x * 2
​
​
@task
def load(y):
    print("Received y: {}".format(y))
​
​
with Flow("ETL") as flow:
    x_param = Parameter('x')
    e = extract(x_param)
    t = transform(e)
    l = load(t)
​
for x in stream():
    thread = Thread(target=flow.run, kwargs={"x": x})
    thread.start()
Sebastian Mendez
  • 2,859
  • 14
  • 25
0

You could change trigger_rule from "all_success" to "all_done"

https://github.com/apache/airflow/blob/62b21d747582d9d2b7cdcc34a326a8a060e2a8dd/airflow/example_dags/example_latest_only_with_trigger.py#L40

And also could create a branch that processes failed documents with trigger_rule set to "one_failed" to move processes those failed documents somehow differently (e.g. move to a "failed" folder and send a notification)

Tagar
  • 13,911
  • 6
  • 95
  • 110
  • Another concern with the second approach is also that I will have to wait for all documents to be processed by a certain task before we may process the documents by a dependent task; for certain documents, processing by a given task may take much longer than other documents, and this means that we're waiting on a single document to continue the rest of the DAG for multiple documents. – Sebastian Mendez Oct 16 '19 at 20:49
0

I would be making 1440 Dagruns per day.

With a good Airflow architecture, this is quite possible. Choking points might be

  1. executor - use Celery Executor instead of Local Executor for example
  2. backend database - monitor and tune as necessary (indexes, proper storage etc)
  3. webserver - well, for thousands of dagruns, tasks etc.. perhaps only use webeserver for dev/qa environments, and not for production where you have higher rate of task/dagruns submissions. You could use cli etc instead.

Another approach is scaling out by running multiple Airflow instances - partition documents let's say to ten buckets, and assign each partition's documents to just one Airflow instance.

Tagar
  • 13,911
  • 6
  • 95
  • 110
  • I guess for option 1 I'm just nervous about how it doesn't seem to be "the correct way" to use Airflow. I'd be creating multiple dagruns scheduled to run at essentially the same time, so any idea of scheduling doesn't line up with this paradigm. – Sebastian Mendez Oct 16 '19 at 22:19
0

I'd process the heavier tasks in parallel and feed successful operations downstream. As far as I know, you can't feed successes asynchronously to downstream tasks, so you would still need to wait for every thread to finish until moving downstream but, this would still be well more acceptable than spawning 1 dag for each record, something in these lines:

Task 1: read mongo filtering by some timestamp (remember idempotence) and feed tasks (i.e. via xcom);

Task 2: do stuff in paralell via PythonOperator, or even better via K8sPod, i.e:

def thread_fun(ret):
    while not job_queue.empty():
        job = job_queue.get()
        try:        
            ret.append(stuff_done(job))
        except:
            pass
    job_queue.task_done()
    return ret

# Create workers and queue
threads = []
ret = [] # a mutable object
job_queue = Queue(maxsize=0)

for thr_nr in appropriate_thread_nr:
    worker = threading.Thread(
        target=thread_fun,
        args=([ret])
    )
    worker.setDaemon(True)
    threads.append(worker)

# Populate queue with jobs
for row in xcom_pull(task_ids=upstream_task):
    job_queue.put(row)

# Start threads
for thr in threads:
    thr.start()

# Wait to finish their jobs
for thr in threads:
    thr.join()

xcom_push(ret)

Task 3: Do more stuff coming from previous task, and so on

Miguel
  • 126
  • 1
  • 9
0

We have built a system that queries MongoDB for a list, and generates a python file per item containing one DAG (note: having each dag have its own python file helps Airflow scheduler efficiency, with it's current design) - the generator DAG runs hourly, right before the scheduled hourly run of all the generated DAGs.

ryw
  • 9,375
  • 5
  • 27
  • 34