1

I have a requirement that I want to schedule an airflow job every alternate Friday. However, the problem is I am not able to figure out how to write a schedule for this.

I don't want to have multiple jobs for this.

I tried this

'0 0 1-7,15-21 * 5

However it's not working it's running from 1 to 7 and 15 to 21 everyday.

from shubham's answer I realize that we can have a PythonOperator which can skip the task for us. An I tried to implement the solution. However doesn't seem to work.

As testing this on 2 week period would be too difficult. This is what I did.

  • I schedule the DAG to run every 5 mins
  • However, I am writing python operator the skip althernate task (pretty similar to what I am trying to do, alternate friday).

DAG:

args = {
    'owner': 'Gaurang Shah',
    'retries': 0,
    'start_date':airflow.utils.dates.days_ago(1),
}


dag = DAG(
    dag_id='test_dag',
    default_args=args,
    catchup=False,
    schedule_interval='*/5 * * * *',
    max_active_runs=1
    )


dummy_op = DummyOperator(task_id='dummy', dag=dag)

def _check_date(execution_date, **context):
    min_date = datetime.now() - relativedelta(minutes=10)
    print(context)
    print(context.get("prev_execution_date"))
    print(execution_date)
    print(datetime.now())
    print(min_date)
    if execution_date > min_date:
        raise AirflowSkipException(f"No data available on this execution_date ({execution_date}).")

check_date = PythonOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

enter image description here

Gaurang Shah
  • 11,764
  • 9
  • 74
  • 137

3 Answers3

0
  • I doubt that a single crontab expression can solve this

  • Using airflow's tricks, solution is much more straightforward:

    Here you'll have to let your DAG begin with a dedicated skip_decider task that will let your DAG run / skip every alternate Friday by

    • conditionally raising AirflowSkipException (to skip the DAG)
    • not doing anything to let the DAG run

You can also leverage

but IMO, AirflowSkipException is the cleanest solution


Reference: How to define a DAG that scheduler a monthly job together with a daily job?

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • seems like a good alternate.. could you please elborate how to skip the second run ? – Gaurang Shah Aug 25 '20 at 16:00
  • **@Gaurang Shah** check [point 4 'Skipping execution of tasks'](https://godatadriven.com/blog/the-zen-of-python-and-apache-airflow/) – y2k-shubham Aug 27 '20 at 16:45
  • Thanks for the response. I gone through the post, however I am not able to understand the logic. `execution_date > (datetime.datetime.now() - relativedelta(weeks=1))` wouldn't `exceution_date` and `dateime.datetime.now` be same. I tested this and both comes same. – Gaurang Shah Aug 28 '20 at 14:37
  • **@Gaurang Shah** Read the section titled [What is the difference between execution_date and start_date?](https://towardsdatascience.com/airflow-schedule-interval-101-bbdda31cc463) – y2k-shubham Aug 28 '20 at 14:47
  • @Thanks for the post. I understand the different. in my case both shows same with only 5-10 seconds difference. I created a dag which triggers at 5 min.. and I was skipping task if duration is less than 10 mins.. however it's not working.. may be I will open a new question or update here – Gaurang Shah Aug 28 '20 at 15:25
0

Depending on your implementation you can use the hash. Worked in my airflow schedules using version 1.10:

Hash (#) '#' is allowed for the day-of-week field, and must be followed by a number between one and five. It allows specifying constructs such as "the second Friday" of a given month.[19] For example, entering "5#3" in the day-of-week field corresponds to the third Friday of every month. Reference

-2

you can use timedelta as mentioned below, combine it with start_date to schedule your job bi_weekly.

   dag = DAG(
       dag_id='test_dag',
       default_args=args,
       catchup=False,
       start_date=datetime(2021, 3, 26),
       schedule_interval=timedelta(days=14),
       max_active_runs=1
   )