I am running 5 PythonOperator tasks in my airflow DAG and one of them is performing an ETL job which is taking a long time, due to which all my resources are blocked. Is there a way I can set a max execution time per task, after which the task either fails or is marked successful (so that the DAG doesnt fail) with a message?
3 Answers
In every operator we have an execution_timeout
variable where you have to pass a datetime.timedelta
object.
As per the base operator code comments:
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta
Also bear in mind that this will fail a single run of the DAG and will trigger re-runs and will only be declared to be a failed DAG when all re-runs have failed.
So depending on what number of auto retries you have assigned, you could have a potential maximum time of ( number of retries ) x ( timeout )
in case the code keeps taking too long.

- 5,262
- 4
- 34
- 58
Check out this previous answer.
In short, using airflow's built in pools or even specifying a start_date for a task (instead of an entire DAG) seem to be potential solutions.

- 798
- 1
- 9
- 21
From this documentation, you'd want to set the execution_timeout
task parameter, which would look something like this
from datetime import timedelta
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(hours=2),
timeout=3600,
retries=2,
mode="reschedule",
)

- 89
- 1
- 2