0

My DAG is schduled to run each hour. I'm pulling each hour of data from an s3 source and processing them. Sometimes the task is taking more than an hour to complete. At that time, I'm missing an hour of data.

Example: 1:00pm DAG started and ran for 2 hours. So my next DAG run takes parameter as 3(3pm) missing 2pm data. In other words, how do I call the task and make sure it runs each hour i., 24 times in a day

RJV
  • 41
  • 3
  • Can you post an example of your DAG? Concurrent DAG's in Airflow are no problem, i.e. your DAG at 2pm should run perfectly fine, even if the DAG@1pm is still running... – dorvak Mar 13 '19 at 08:40
  • 1
    @dorvak You are right. It was my logic. I want to run dag each hour and I'm passing hour based on current_time. My airflow environment take only 4 instances in the queue. So sometimes my task instance where I pass hour as parameter is delayed(due to other long running jobs). Example: – RJV Mar 13 '19 at 17:15

3 Answers3

0

Here is my DAG

HOUR_PACIFIC = arrow.utcnow().shift(hours=-3).to('US/Pacific').format("HH")

dag = DAG(
    DAG_ID,
    catchup=False,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=5),
    schedule_interval='0 * * * *')

start = DummyOperator(
    task_id='Start',
    dag=dag)

my_task = EMRStep(emr,
'stg',
HOUR_PACIFIC)

end = DummyOperator(
    task_id='End',
    dag=dag
)
start >> my_task >> end
RJV
  • 41
  • 3
0

You need to pass the catchup=True for the DAG object.

Sathish
  • 332
  • 4
  • 11
-1

This appears to be a perfect scenario for using TimeDeltaSensor


Note: following code-snippet is just for reference and has NOT been tested

import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.utils.trigger_rule import TriggerRule

# create DAG object
my_dag: DAG = DAG(dag_id="my_dag",
                  start_date=datetime.datetime(year=2019, month=3, day=11),
                  schedule_interval="0 0 0 * * *")

# create dummy begin & end tasks
my_begin_task: DummyOperator = DummyOperator(dag=my_dag,
                                             task_id="my_begin_task")
my_end_task: DummyOperator = DummyOperator(dag=my_dag,
                                           task_id="my_end_task",
                                           trigger_rule=TriggerRule.ALL_DONE)

# populate the DAG
for i in range(1, 24, 1):
    # create sensors and actual tasks for all hours of the day
    my_time_delta_sensor: TimeDeltaSensor = TimeDeltaSensor(dag=my_dag,
                                                            task_id=f"my_time_delta_sensor_task_{i}_hours",
                                                            delta=datetime.timedelta(hours=i))
    my_actual_task: PythonOperator = PythonOperator(dag=my_dag,
                                                    task_id=f"my_actual_task_{i}_hours",
                                                    python_callable=my_callable
                                                    ..)
    # wire-up tasks together
    my_begin_task >> my_time_delta_sensor >> my_actual_task >> my_end_task

References

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131