3

How to run airflow dag for specified number of times?

I tried using TriggerDagRunOperator, This operators works for me. In callable function we can check states and decide to continue or not.

However the current count and states needs to be maintained.

Using above approach I am able to repeat DAG 'run'.

Need expert opinion, Is there is any other profound way to run Airflow DAG for X number of times? Thanks.

Omkara
  • 414
  • 4
  • 16
  • Try also to explain what you are trying to achieve that the job needs to run a specific number of times. – tobi6 Dec 14 '18 at 14:23
  • Lets say single DAG has 5 tasks. And I want to run this DAG for 10 times. Assume single run takes 2 hours sometimes more than that. Simply I I can not schedule based on time. Hence want to run DAG based on number I will specify in DAG configuration. – Omkara Dec 14 '18 at 18:12

2 Answers2

4

I'm afraid that Airflow is ENTIRELY about time based scheduling.
You can set a schedule to None and then use the API to trigger runs, but you'd be doing that externally, and thus maintaining the counts and states that determine when and why to trigger externally.

When you say that your DAG may have 5 tasks which you want to run 10 times and a run takes 2 hours and you cannot schedule it based on time, this is confusing. We have no idea what the significance of 2 hours is to you, or why it must be 10 runs, nor why you cannot schedule it to run those 5 tasks once a day. With a simple daily schedule it would run once a day at approximately the same time, and it won't matter that it takes a little longer than 2 hours on any given day. Right?

You could set the start_date to 11 days ago (a fixed date though, don't set it dynamically), and the end_date to today (also fixed) and then add a daily schedule_interval and a max_active_runs of 1 and you'll get exactly 10 runs and it'll run them back to back without overlapping while changing the execution_date accordingly, then stop. Or you could just use airflow backfill with a None scheduled DAG and a range of execution datetimes.

Do you mean that you want it to run every 2 hours continuously, but sometimes it will be running longer and you don't want it to overlap runs? Well, you definitely can schedule it to run every 2 hours (0 0/2 * * *) and set the max_active_runs to 1, so that if the prior run hasn't finished the next run will wait then kick off when the prior one has completed. See the last bullet in https://airflow.apache.org/faq.html#why-isn-t-my-task-getting-scheduled.

If you want your DAG to run exactly every 2 hours on the dot [give or take some scheduler lag, yes that's a thing] and to leave the prior run going, that's mostly the default behavior, but you could add depends_on_past to some of the important tasks that themselves shouldn't be run concurrently (like creating, inserting to, or dropping a temp table), or use a pool with a single slot.

There isn't any feature to kill the prior run if your next schedule is ready to start. It might be possible to skip the current run if the prior one hasn't completed yet, but I forget how that's done exactly.

That's basically most of your options there. Also you could create manual dag_runs for an unscheduled DAG; creating 10 at a time when you feel like (using the UI or CLI instead of the API, but the API might be easier).

Do any of these suggestions address your concerns? Because it's not clear why you want a fixed number of runs, how frequently, or with what schedule and conditions, it's difficult to provide specific recommendations.

Community
  • 1
  • 1
dlamblin
  • 43,965
  • 20
  • 101
  • 140
  • Very helpful answer, found some clue from paragraph four. Thank you very much. Basically, I want to cross check if there any setting or configuration (coded or json based) to run DAG sequentially one after other. Example, just for analogy, consider that we want to run DAG in 'for-loop', where 'for-loop' is controlled using counter variable. If such feature is available then I will not worry about the time it takes to execute single iteration. And hence user can run manually and control iteration count externally through Airflow Variables. – Omkara Dec 19 '18 at 06:24
  • @Omkara from what you commented it sounds like you might like to try ending your DAG in a `BranchOperator` which would branch to either a Dummy END task or a `TriggerDagRunOperator` on its own DAG id and which decrements an Airflow Variable or some other external data source (DB, http get/put/post, a value in S3/GCP path etc) to determine the branch path. – dlamblin Dec 19 '18 at 06:38
  • Yes, as I mentioned earlier in post, that's what I did as a first attempt. However looks little clumsy to me. anyways. – Omkara Dec 19 '18 at 06:54
1
  • This functionality isn't natively supported by Airflow
  • But by exploiting the meta-db, we can cook-up this functionality ourselves

we can write a custom-operator / python operator

  • before running the actual computation, check if 'n' runs for the task (TaskInstance table) already exist in meta-db. (Refer to task_command.py for help)
  • and if they do, just skip the task (raise AirflowSkipException, reference)

This excellent article can be used for inspiration: Use apache airflow to run task exactly once


Note

The downside of this approach is that it assumes historical runs of task (TaskInstances) would forever be preserved (and correctly)

  • in practise though, I've often found task_instances to be missing (we have catchup set to False)
  • furthermore, on large Airflow deployments, one might need to setup routinal cleanup of meta-db, which would make this approach impossible
y2k-shubham
  • 10,183
  • 11
  • 55
  • 131