4

We moved to puckel/Airflow-1.10.2 to try and resolve a poor performance we've had in multiple environments. We are running on ECS Airflow 1.10.2 on AWS ECS. Interestingly, the CPU/mem never jump above 80%. The Airflow metadb stays very underutilized as well.

Below I've listed the configuration we're using, the DagBag parsing time plus the detailed execution times from the cProfile output of just running DagBag() in pure Python.

A few of our DAGs import a function from create_subdag_functions.py that returns a DAG we use in 12 DAGs. Most of those DAGs and their corresponding subdags only run on the hour, but 1 DAG / 3 Subdags run every 10 minutes.

max_threads = 2
dag_dir_list_interval = 300 
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor

Some observations:

  • airflow list_dags -r takes a very long time too and runs off the example DAGs even though they are disabled. The time to parse for each DAG will jump around.
  • The duration for each DAG is inconsistent (but it only applies to our DAGs and not the examples)
  • There is usually a big jump in that parsing time. e.g. 5 dags will have durations < 1 and then the next 4 will have durations 20+
  • When we profiled the DagBag() function with cProfile we found that DagBag() spent most of its time in the airflow.utils.dag_processing.list_py_paths function probably due to 50+ sql files in our /usr/local/airflow/dags folder
  • Looking through the Landing Times, the task time jumped an order of magnitude between two specific runs. I've tried to look through logs, etc and there is nothing notable between two runs. I've attached the image at the bottom. This performance loss was in Airflow 1.10.0

Solutions I've tried:

  • increasing/lowering max_threads
  • increasing/eliminating min_file_process_interval
  • clearing the airflow database of all DAGs and reloading
  • shutting down and redeploying the environment

DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 189.77048399999995
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
--------------------------------------------+--------------------+---------+----------+------------------------------------------------------------------------------------------------------------
/dag1.py                                    | 60.576728          |       1 |       21 | ['dag1']
/dag2.py                                    | 55.092603999999994 |       1 |       28 | ['dag2']
/dag3.py                                    | 47.997972000000004 |       1 |       17 | ['dag3']
/dag4.py                                    | 22.99313           |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/dag5.py                                    | 0.67               |       1 |       21 | ['dag5']
/dag6.py                                    | 0.652114           |       1 |        9 | ['dag6']
/dag7.py                                    | 0.45368            |       1 |       26 | ['dag7']
/dag8.py                                    | 0.396908           |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag9.py                                    | 0.242012           |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag10.py                                   | 0.134342           |       1 |        1 | ['dag10']
/dag11.py                                   | 0.13325            |       2 |        8 | ['dag11', 'dag12.subdag1']
/dag12.py                                   | 0.10562            |       1 |        6 | ['dag12']
/create_subdag_functions.py                 | 0.105292           |       0 |        0 | []
 example_http_operator.py                   | 0.040636           |       1 |        6 | ['example_http_operator']
 example_subdag_operator.py                 | 0.005328           |       3 |       15 | ['example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2']
 example_bash_operator.py                   | 0.004052           |       1 |        6 | ['example_bash_operator']
 example_branch_operator.py                 | 0.003444           |       1 |       11 | ['example_branch_operator']
 example_branch_python_dop_operator_3.py    | 0.003418           |       1 |        3 | ['example_branch_dop_operator_v3']
 example_passing_params_via_test_command.py | 0.003222           |       1 |        2 | ['example_passing_params_via_test_command']
 example_skip_dag.py                        | 0.002386           |       1 |        8 | ['example_skip_dag']
 example_trigger_controller_dag.py          | 0.002386           |       1 |        1 | ['example_trigger_controller_dag']
 example_short_circuit_operator.py          | 0.002344           |       1 |        6 | ['example_short_circuit_operator']
 example_python_operator.py                 | 0.002218           |       1 |        6 | ['example_python_operator']
 example_latest_only.py                     | 0.002196           |       1 |        2 | ['latest_only']
 example_latest_only_with_trigger.py        | 0.001848           |       1 |        5 | ['latest_only_with_trigger']
 example_xcom.py                            | 0.001722           |       1 |        3 | ['example_xcom']
 docker_copy_data.py                        | 0.001718           |       0 |        0 | []
 example_trigger_target_dag.py              | 0.001704           |       1 |        2 | ['example_trigger_target_dag']
 tutorial.py                                | 0.00165            |       1 |        3 | ['tutorial']
 test_utils.py                              | 0.001376           |       1 |        1 | ['test_utils']
 example_docker_operator.py                 | 0.00103            |       0 |        0 | []
 subdags/subdag.py                          | 0.001016           |       0 |        0 | []
-------------------------------------------------------------------------------------------------------+--------------------+---------+----------+--------------------------------------------------
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file                          | duration           | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py                      | 74.819988          |       1 |       21 | ['dag1']
/dag3.py                      | 53.193430000000006 |       1 |       17 | ['dag3']
/dag8.py                      | 34.535742          |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py                      | 21.543944000000003 |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py                      | 18.458316000000003 |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py   | 14.652806000000002 |       0 |        0 | []
/dag7.py                      | 13.051984000000001 |       2 |        8 | ['dag11', 'dag11.subdag1']
/dag8.py                      | 10.02703           |       1 |       21 | ['dag5']
/dag9.py                      | 9.834226000000001  |       1 |        1 | ['dag10']
/dag10.py                     | 9.575258000000002  |       1 |       28 | ['dag2']
/dag11.py                     | 9.418897999999999  |       1 |        9 | ['dag6']
/dag12.py                     | 9.319210000000002  |       1 |        6 | ['dag12']
/dag13.py                     | 8.686964           |       1 |       26 | ['dag7']

Note: removed the example DAGs from the second output for brevity

cProfile output of from airflow.models import DagBag; DagBag():

{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from 

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      997  443.441    0.445  443.441    0.445 {built-in method io.open}
      198  186.978    0.944  483.629    2.443 zipfile.py:198(is_zipfile)
      642   65.069    0.101   65.069    0.101 {method 'close' of '_io.BufferedReader' objects}
     1351   45.924    0.034   45.946    0.034 <frozen importlib._bootstrap_external>:830(get_data)
     7916   39.403    0.005   39.403    0.005 {built-in method posix.stat}
      2/1   22.927   11.464  544.419  544.419 dag_processing.py:220(list_py_file_paths)
       33   18.992    0.576  289.797    8.782 models.py:321(process_file)
       22    8.723    0.397    8.723    0.397 {built-in method posix.scandir}
      412    2.379    0.006    2.379    0.006 {built-in method posix.listdir}
        9    1.301    0.145    3.058    0.340 linecache.py:82(updatecache)
 1682/355    0.186    0.000    0.731    0.002 sre_parse.py:470(_parse)
     1255    0.183    0.000    0.183    0.000 {built-in method marshal.loads}
 3092/325    0.143    0.000    0.647    0.002 sre_compile.py:64(_compile)
       59    0.139    0.002    0.139    0.002 {built-in method builtins.compile}
    25270    0.134    0.000    0.210    0.000 sre_parse.py:253(get)
    52266    0.132    0.000    0.132    0.000 {method 'append' of 'list' objects}
4210/4145    0.131    0.000    1.760    0.000 {built-in method builtins.__build_class__}

Airflow performance drop:

Airflow performance drop

Seth
  • 446
  • 5
  • 10
  • 1
    We believe it might be due to EFS. Our dags are mounted in EFS along with our SQL files. In our slow environments we have 0 average burst credits and they correlate with when the DAGs slowed down. So when the scheduler constantly reads them and we place new dags there at the same time. We believe that is why there is such large jumps in dag processing time. – Seth Apr 11 '19 at 19:59
  • 2
    It was EFS. Either don't store DAGs in EFS or use a Provisioned Mode (not Burst) or increase min_file_processing time to something very large where it won't try and read the DAG files so many times per minute – Seth May 01 '19 at 14:17
  • 1
    Could you update the post with teh commands you used to generate each of the stats? – DJ_Stuffy_K Apr 19 '21 at 17:56

0 Answers0