1

I'm using spark-streaming (spark version 2.2) on yarn cluster and am trying to enable dynamic core allocation for my application.

The number of executors scale up as required but once executors are assigned, they are not being scaled down even when the traffic is reduced, i.e, and executor once allocated is not being released. I have also enabled external shuffle service on yarn, as mentioned here: https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service

The configs that I have set in spark-submit command are:

            --conf spark.dynamicAllocation.enabled=false \
            --conf spark.streaming.dynamicAllocation.enabled=true \
            --conf spark.streaming.dynamicAllocation.scalingInterval=30 \
            --conf spark.shuffle.service.enabled=true \
            --conf spark.streaming.dynamicAllocation.initialExecutors=15 \
            --conf spark.streaming.dynamicAllocation.minExecutors=10 \
            --conf spark.streaming.dynamicAllocation.maxExecutors=30 \
            --conf spark.streaming.dynamicAllocation.executorIdleTimeout=60s \
            --conf spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout=60s \

Can someone help if there is any particular config that I'm missing ?

Thanks

  • Found same behavior at spark 2.2 (not streaming). It looks like new Remove Policy with reduced disk usage while cached data exists on executors. I'am doing some stuff with spark.dynamicAllocation.cachedExecutorIdleTimeout parameter as mentioned at [official DOCs](https://spark.apache.org/docs/2.2.0/job-scheduling.html). – skypiece Dec 26 '18 at 07:56

1 Answers1

1

The document added as part of this JIRA helped me out: https://issues.apache.org/jira/browse/SPARK-12133.

The key point to note was that the number of executors are scaled down when the ratio (batch processing time/batch duration) is less than 0.5(default value) which essentially means that the executors are idle for half of the time. The config that can be used to alter this default value is "spark.streaming.dynamicAllocation.scalingDownRatio"