My answer is mostly based on this SO Answer.
Recently, getExecutorStorageStatus
has been removed from SparkContext
(in the newer version of spark). Hence you can't use sc. getExecutorStorageStatus
. Instead we can use SparkEnv's blockManager.master.getStorageStatus.length - 1
(the minus one is for the driver again). The normal way to get to it, via env
of SparkContext
. But it is not accessible outside of the org.apache.spark
package. Therefore, we use an encapsulation violation pattern
as:
package org.apache.spark.util
import org.apache.spark.{SparkContext, SparkEnv}
/**
* Below objects are not accessible outside of the org.apache.spark.util package.
* Therefore, we use an encapsulation violation pattern.
*/
object SparkInternalUtils {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
def getThreadUtils: ThreadUtils.type = ThreadUtils
}
Now, we can get the instance of SparkEnv
using SparkInternalUtils.sparkEnv(sc)
Define RichSparkContext as below-
import org.apache.spark.SparkContext
import org.apache.spark.util.SparkInternalUtils
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
SparkInternalUtils.sparkEnv(sc).blockManager.master.getStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
In scala, get the number of executors & and core count
val sc = ... // SparkContext instance
import RichSparkContext.implicits._
val executorCount = sc.executorCount
val coresPerExecutor = sc.coresPerExecutor
val totalCoreCount = sc.coreCount
In java, get the number of executors & and core count
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
RichSparkContext richSparkContext = new RichSparkContext(javaSparkContext.sc());
System.out.println(richSparkContext.coresPerExecutor());
System.out.println(richSparkContext.coreCount());
System.out.println(richSparkContext.executorCount());