We moved to puckel/Airflow-1.10.2 to try and resolve a poor performance we've had in multiple environments. We are running on ECS Airflow 1.10.2 on AWS ECS. Interestingly, the CPU/mem never jump above 80%. The Airflow metadb stays very underutilized as well.
Below I've listed the configuration we're using, the DagBag parsing time plus the detailed execution times from the cProfile
output of just running DagBag()
in pure Python.
A few of our DAGs import a function from create_subdag_functions.py
that returns a DAG we use in 12 DAGs. Most of those DAGs and their corresponding subdags only run on the hour, but 1 DAG / 3 Subdags run every 10 minutes.
max_threads = 2
dag_dir_list_interval = 300
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor
Some observations:
airflow list_dags -r
takes a very long time too and runs off the example DAGs even though they are disabled. The time to parse for each DAG will jump around.- The duration for each DAG is inconsistent (but it only applies to our DAGs and not the examples)
- There is usually a big jump in that parsing time. e.g. 5 dags will have durations < 1 and then the next 4 will have durations 20+
- When we profiled the
DagBag()
function with cProfile we found that DagBag() spent most of its time in theairflow.utils.dag_processing.list_py_paths
function probably due to 50+ sql files in our /usr/local/airflow/dags folder - Looking through the Landing Times, the task time jumped an order of magnitude between two specific runs. I've tried to look through logs, etc and there is nothing notable between two runs. I've attached the image at the bottom. This performance loss was in Airflow 1.10.0
Solutions I've tried:
- increasing/lowering
max_threads
- increasing/eliminating
min_file_process_interval
- clearing the airflow database of all DAGs and reloading
- shutting down and redeploying the environment
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
/dag1.py | 60.576728 | 1 | 21 | ['dag1']
/dag2.py | 55.092603999999994 | 1 | 28 | ['dag2']
/dag3.py | 47.997972000000004 | 1 | 17 | ['dag3']
/dag4.py | 22.99313 | 3 | 16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/dag5.py | 0.67 | 1 | 21 | ['dag5']
/dag6.py | 0.652114 | 1 | 9 | ['dag6']
/dag7.py | 0.45368 | 1 | 26 | ['dag7']
/dag8.py | 0.396908 | 5 | 40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag9.py | 0.242012 | 6 | 38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag10.py | 0.134342 | 1 | 1 | ['dag10']
/dag11.py | 0.13325 | 2 | 8 | ['dag11', 'dag12.subdag1']
/dag12.py | 0.10562 | 1 | 6 | ['dag12']
/create_subdag_functions.py | 0.105292 | 0 | 0 | []
example_http_operator.py | 0.040636 | 1 | 6 | ['example_http_operator']
example_subdag_operator.py | 0.005328 | 3 | 15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']
example_bash_operator.py | 0.004052 | 1 | 6 | ['example_bash_operator']
example_branch_operator.py | 0.003444 | 1 | 11 | ['example_branch_operator']
example_branch_python_dop_operator_3.py | 0.003418 | 1 | 3 | ['example_branch_dop_operator_v3']
example_passing_params_via_test_command.py | 0.003222 | 1 | 2 | ['example_passing_params_via_test_command']
example_skip_dag.py | 0.002386 | 1 | 8 | ['example_skip_dag']
example_trigger_controller_dag.py | 0.002386 | 1 | 1 | ['example_trigger_controller_dag']
example_short_circuit_operator.py | 0.002344 | 1 | 6 | ['example_short_circuit_operator']
example_python_operator.py | 0.002218 | 1 | 6 | ['example_python_operator']
example_latest_only.py | 0.002196 | 1 | 2 | ['latest_only']
example_latest_only_with_trigger.py | 0.001848 | 1 | 5 | ['latest_only_with_trigger']
example_xcom.py | 0.001722 | 1 | 3 | ['example_xcom']
docker_copy_data.py | 0.001718 | 0 | 0 | []
example_trigger_target_dag.py | 0.001704 | 1 | 2 | ['example_trigger_target_dag']
tutorial.py | 0.00165 | 1 | 3 | ['tutorial']
test_utils.py | 0.001376 | 1 | 1 | ['test_utils']
example_docker_operator.py | 0.00103 | 0 | 0 | []
subdags/subdag.py | 0.001016 | 0 | 0 | []
-------------------------------------------------------------------------------------------------------+--------------------+---------+----------+--------------------------------------------------
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file | duration | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py | 74.819988 | 1 | 21 | ['dag1']
/dag3.py | 53.193430000000006 | 1 | 17 | ['dag3']
/dag8.py | 34.535742 | 5 | 40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py | 21.543944000000003 | 6 | 38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py | 18.458316000000003 | 3 | 16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py | 14.652806000000002 | 0 | 0 | []
/dag7.py | 13.051984000000001 | 2 | 8 | ['dag11', 'dag11.subdag1']
/dag8.py | 10.02703 | 1 | 21 | ['dag5']
/dag9.py | 9.834226000000001 | 1 | 1 | ['dag10']
/dag10.py | 9.575258000000002 | 1 | 28 | ['dag2']
/dag11.py | 9.418897999999999 | 1 | 9 | ['dag6']
/dag12.py | 9.319210000000002 | 1 | 6 | ['dag12']
/dag13.py | 8.686964 | 1 | 26 | ['dag7']
Note: removed the example DAGs from the second output for brevity
cProfile output of from airflow.models import DagBag; DagBag()
:
{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from
ncalls tottime percall cumtime percall filename:lineno(function)
997 443.441 0.445 443.441 0.445 {built-in method io.open}
198 186.978 0.944 483.629 2.443 zipfile.py:198(is_zipfile)
642 65.069 0.101 65.069 0.101 {method 'close' of '_io.BufferedReader' objects}
1351 45.924 0.034 45.946 0.034 <frozen importlib._bootstrap_external>:830(get_data)
7916 39.403 0.005 39.403 0.005 {built-in method posix.stat}
2/1 22.927 11.464 544.419 544.419 dag_processing.py:220(list_py_file_paths)
33 18.992 0.576 289.797 8.782 models.py:321(process_file)
22 8.723 0.397 8.723 0.397 {built-in method posix.scandir}
412 2.379 0.006 2.379 0.006 {built-in method posix.listdir}
9 1.301 0.145 3.058 0.340 linecache.py:82(updatecache)
1682/355 0.186 0.000 0.731 0.002 sre_parse.py:470(_parse)
1255 0.183 0.000 0.183 0.000 {built-in method marshal.loads}
3092/325 0.143 0.000 0.647 0.002 sre_compile.py:64(_compile)
59 0.139 0.002 0.139 0.002 {built-in method builtins.compile}
25270 0.134 0.000 0.210 0.000 sre_parse.py:253(get)
52266 0.132 0.000 0.132 0.000 {method 'append' of 'list' objects}
4210/4145 0.131 0.000 1.760 0.000 {built-in method builtins.__build_class__}
Airflow performance drop: