2

I have a DAG that runs every hour and does incremental loads like :

select * from <table_name>
where last_mod_dt between <execution_date> AND <next_execution_date>;

---***execution_date here is the current dag instance execution date (1PM, 2PM...)

However, some hourly DAG's instance fails in between, hence want to ensure that next task instance execution picks always from the last successful instance only; so that no delta records are missed from the source.

For example:- current scenario

DAG_Ins1 - 1 PM - Success (last_mod_dt between 1 PM AND 2 PM)
DAG_Ins2 - 2 PM - Fail (last_mod_dt between 2 PM AND 3 PM)
DAG_Ins3 - 3 PM - Fail (last_mod_dt between 3 PM AND 4 PM)
DAG_Ins4 - 4 PM - Sucesss (last_mod_dt between 4 PM AND 5 PM)

The last 4th instance which was success only picks current execution date (irrespective of the last status)

Expected:

DAG_Ins1 - 1 PM - Success (last_mod_dt between 1 PM AND 2 PM)
DAG_Ins2 - 2 PM - Fail (last_mod_dt between 2 PM AND 3 PM)
DAG_Ins3 - 3 PM - Fail (last_mod_dt between 2 PM AND 4 PM)
DAG_Ins4 - 4 PM - Sucesss (last_mod_dt between 2 PM AND 5 PM)

However, the last 4th instance should have picked date from the last successful execution end status which was 2 PM in the above example.

Don't want to query Airflow metadata tables. Do this method:- get_latest_execution_date() returns the last successful/failed date-time of a DAG? Any other {{macro}} that would do the needful ?

Thanks!!

ManiK
  • 377
  • 1
  • 21

1 Answers1

3

I think what you are looking for is prev_execution_date_success macro. This macro provide the execution_date of the last successful DAG run.

Your SQL can be:

select * from <table_name>
where last_mod_dt between '{{ prev_execution_date_success }}' AND '{{ next_execution_date }}';

Reference to all available macros can be found here.

Update for Airflow >= 2.2.0:

The prev_execution_date_success macro is deprecated. You can use other macros for example:

select * from <table_name>
where last_mod_dt between '{{ prev_data_interval_end_success }}' AND '{{ data_interval_start }}';

Noting that Airflow 2.2.0 implemented AIP 39 - Richer schedule_interval so it's probably make more sense to query the data interval directly as:

select * from <table_name>
where last_mod_dt between '{{ data_interval_start }}' AND '{{ data_interval_end }}';
Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • Thanks Elad. Can you(or ManiK) shed some light on how to use `prev_execution_date_success` macro to trigger dag from the last successful execution date ? – Smit Thakkar Jul 23 '21 at 20:32
  • @Elad Hi. I see the documentation of Airflow mentioning {{ prev_execution_date_success }} as deprecated. Is there an alternate macro? I tried using {{ prev_start_date_success }} but that is giving me task's successful execution date instead of DAG's. – sr1991 Nov 02 '21 at 22:33
  • @SR1991 Updated the answer to provide information for newer Airflow versions. – Elad Kalif Nov 03 '21 at 14:04
  • Thank you! I will take a look at those options to see how I can modify my code. – sr1991 Nov 05 '21 at 02:08
  • What is the format of `prev_execution_date_success`, is it a date (would suck) or timestamp? – jayarjo Jul 04 '22 at 14:37
  • 2
    @jayarjo It's `pendulum.DateTime` check https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html there is example in the table of macros – Elad Kalif Jul 04 '22 at 14:52
  • I see that result can be `None`... is it possible to default to some other date in such case? Or what happens. – jayarjo Jul 04 '22 at 15:20
  • @jayarjo It's more a Jinja question not Airflow, Airflow just uses the Jinja engine so all abilities of Jinja are available. Check https://stackoverflow.com/questions/19614027/jinja2-template-variable-if-none-object-set-a-default-value – Elad Kalif Jul 04 '22 at 16:24