7

I am running 5 PythonOperator tasks in my airflow DAG and one of them is performing an ETL job which is taking a long time, due to which all my resources are blocked. Is there a way I can set a max execution time per task, after which the task either fails or is marked successful (so that the DAG doesnt fail) with a message?

Prithu Srinivas
  • 245
  • 1
  • 3
  • 9

3 Answers3

11

In every operator we have an execution_timeout variable where you have to pass a datetime.timedelta object.

As per the base operator code comments:

:param execution_timeout: max time allowed for the execution of
    this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta

Also bear in mind that this will fail a single run of the DAG and will trigger re-runs and will only be declared to be a failed DAG when all re-runs have failed.

So depending on what number of auto retries you have assigned, you could have a potential maximum time of ( number of retries ) x ( timeout ) in case the code keeps taking too long.

Meghdeep Ray
  • 5,262
  • 4
  • 34
  • 58
1

Check out this previous answer.

In short, using airflow's built in pools or even specifying a start_date for a task (instead of an entire DAG) seem to be potential solutions.

manesioz
  • 798
  • 1
  • 9
  • 21
0

From this documentation, you'd want to set the execution_timeout task parameter, which would look something like this

from datetime import timedelta

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(hours=2),
    timeout=3600,
    retries=2,
    mode="reschedule",
)
miguel dias
  • 89
  • 1
  • 2