1

I’m trying to create an airflow dag that runs an sql query to get all of yesterday’s data, but I want the execution date to be delayed from the data_interval_end.

So the data interval is ending at midnight, but it takes few hours for the data itself to be ready for querying. This is why I want the dag to run only after 4 hours.

For example:

data_interval_start = 2022-01-01 00:00:00
data_interval_end = 2022-01-02 00:00:00
wanted dag execution = 2022-01-02 00:04:0

How can I achieve this? Thanks!

So far I just adjusted the sql query itself with date_trunc, but I hope there is a solution to keep the query without this function.

Wave
  • 11
  • 1
  • what is creating the other data? If another DAG, look into the external_task_sensor. If there is another process, you can just check if the new data are in the table using the sql_sensor – mucio Dec 06 '22 at 16:33
  • Unfortunately not another dag is creating the data... the problem is that the table has data from the start so I can't use the sql sensor, I just guesstimate that around 4AM enough new data will be in the table. – Wave Dec 12 '22 at 06:23

1 Answers1

0

Instead of delaying by fixed time, you may use BranchSQLOperator, it has follow_task_ids_if_true and follow_task_ids_if_false. If you use fixed time window, it might run even in the cases where your data is not ready.

operator = BranchSQLOperator(
task_id="check_data_presence_task",
conn_id="sql_connection_id",
sql="SELECT count(1) FROM my_table where date>=today_date",
follow_task_ids_if_true="success_task_id",
follow_task_ids_if_false="fail_task_id",
dag=dag

)

S N
  • 540
  • 1
  • 6
  • 18
  • I'm familiar with this operator, the thing is that I don't actually know when the data interval is finished populating. It can have some data right after the data_interval_end, but only partial data... that's why I want to wait a predetermined time period and not use some branch/sensor operator. – Wave Dec 06 '22 at 06:18