16

I run my spark application in yarn cluster. In my code I use number available cores of queue for creating partitions on my dataset:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());

My question: how can I get number available cores of queue by programmatically way and not by configuration?

Sim
  • 13,147
  • 9
  • 66
  • 95
Rougher
  • 834
  • 5
  • 19
  • 46

5 Answers5

20

There are ways to get both the number of executors and the number of cores in a cluster from Spark. Here is a bit of Scala utility code that I've used in the past. You should easily be able to adapt it to Java. There are two key ideas:

  1. The number of workers is the number of executors minus one or sc.getExecutorStorageStatus.length - 1.

  2. The number of cores per worker can be obtained by executing java.lang.Runtime.getRuntime.availableProcessors on a worker.

The rest of the code is boilerplate for adding convenience methods to SparkContext using Scala implicits. I wrote the code for 1.x years ago, which is why it is not using SparkSession.

One final point: it is often a good idea to coalesce to a multiple of your cores as this can improve performance in the case of skewed data. In practice, I use anywhere between 1.5x and 4x, depending on the size of data and whether the job is running on a shared cluster or not.

import org.apache.spark.SparkContext

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

Update

Recently, getExecutorStorageStatus has been removed. We have switched to using SparkEnv's blockManager.master.getStorageStatus.length - 1 (the minus one is for the driver again). The normal way to get to it, via env of SparkContext is not accessible outside of the org.apache.spark package. Therefore, we use an encapsulation violation pattern:

package org.apache.spark

object EncapsulationViolator {
  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
Sim
  • 13,147
  • 9
  • 66
  • 95
  • 1
    sc.getExecutorStorageStatus.length - 1 is good for me. Thank you – Rougher Nov 22 '17 at 17:14
  • 2
    sometimes executor cores are overprovisioned or underprovisioned, which means JVM runtime function may be inaccurate. – tribbloid Jul 26 '18 at 04:21
  • 1
    @tribbloid absolutely true and also true in the case of complex dynamic pool provisioning in various cluster management systems. This is for the common/easy case and needs to be adjusted for complex scenarios. – Sim Jul 28 '18 at 20:19
  • In this code sample, _coresPerExecutor is not assigned to. – Andy Kershaw Aug 27 '19 at 05:49
  • @AndyKershaw I think you are mistaken `private var _coresPerExecutor: Int = 0`. – Sim Sep 12 '19 at 04:21
  • 3
    FYI getExecutorStorageStatus is no longer available as of Spark 2.4.4 – Denis Makarenko Feb 21 '20 at 23:40
  • Thanks for the reminder @DenisMakarenko. We updated our code long ago but I did not update this answer. – Sim Feb 23 '20 at 02:05
  • Since you're hacking around anyway, you don't really need that violator. Just do ```val env: org.apache.spark.SparkEnv = sc.getClass.getMethod("env").invoke(sc).asInstanceOf[org.apache.spark.SparkEnv]```. Handy from a spark-shell, too. – James Moore Apr 18 '20 at 15:43
  • @JamesMoore I prefer compile-time errors to runtime errors in case APIs change. – Sim Apr 19 '20 at 04:40
  • @Sim I assumed _coresPerExecutor was meant to hold the result of the calculation to avoid repeating it and that's why it's a var? Subtracting one for the driver doesn't work if you run single threaded when testing. I currently use Math.max(sparkSession.sparkContext.getExecutorMemoryStatus.size - 1, 1) as getExecutorMemoryStatus is still available. – Andy Kershaw Sep 10 '20 at 11:38
  • @AndyKershaw I'd strongly recommend against testing with a single worker thread: turning off parallelism can hide certain types of problems until production. We test with 2+ executors and 2+ partitions. – Sim Oct 29 '20 at 14:47
  • 1
    @Sim Correct. Debugging would have been a better word for me to use as sometimes it is helpful to do that single threaded. – Andy Kershaw Dec 09 '20 at 15:29
8

Found this while looking for the answer to pretty much the same question.

I found that:

Dataset ds = ...
ds.coalesce(sc.defaultParallelism());

does exactly what the OP was looking for.

For example, my 5 node x 8 core cluster returns 40 for the defaultParallelism.

Steve C
  • 18,876
  • 5
  • 34
  • 37
1

According to Databricks if the driver and executors are of the same node type, this is the way to go:

java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)
zaxme
  • 1,065
  • 11
  • 29
  • java.lang.Runtime.getRuntime.availableProcessors tells you how many cpus are on the current machine. Can't assume that's true for all machines in the cluster. – James Moore Apr 15 '20 at 20:36
  • @JamesMoore you are correct. This works only in the case the Driver and Worker nodes are of the same node type. – zaxme Apr 16 '20 at 10:17
  • @zaxme I ran this and only got a value for java.lang.Runtime.getRuntime.availableProcessors, and get 0 for sc.statusTracker.getExecutorInfos.length. Is there an appropriate time to request this? I am using with EMR cluster, not databricks. Thanks! – capt-mac Jul 13 '22 at 16:04
  • @capt-mac I haven't worked with EMR but maybe this would be useful for you - https://stackoverflow.com/a/46513788/1450817 – zaxme Jul 13 '22 at 20:43
  • @zaxme thanks, this looks like I would just manually feed in the info, suggested threads appear to give number of processors, which I can currently get. I need a way for executors to provide a value (sc.statusTracker.getExecutorInfos.length returns 0), which would allow me to programmatically tell my jobs how many partitions to give without having to set up a config for it – capt-mac Jul 18 '22 at 14:51
1

You could run jobs on every machine and ask it for the number of cores, but that's not necessarily what's available for Spark (as pointed out by @tribbloid in a comment on another answer):

import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum

Running it in the shell (on a tiny test cluster with two workers) gives:

scala> :paste
// Entering paste mode (ctrl-D to finish)

    import spark.implicits._
    import scala.collection.JavaConverters._
    import sys.process._
    val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
    val nCpus = procs.values.sum

// Exiting paste mode, now interpreting.

import spark.implicits._                                                        
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4

Add zeros to your range if you typically have lots of machines in your cluster. Even on my two-machine cluster 10000 completes in a couple seconds.

This is probably only useful if you want more information than sc.defaultParallelism() will give you (as in @SteveC 's answer)

James Moore
  • 8,636
  • 5
  • 71
  • 90
0

For all of those that aren't using yarn clusters: If you are doing it in Python/Databricks here is a function I wrote that will help solve the opportunity. This will get you both the number of worker nodes as well as the number of CPU's and return the multiplied final CPU count of your worker distribution.

def GetDistCPUCount():
    nWorkers = int(spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterTargetWorkers'))
    GetType = spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType')
    GetSubString = pd.Series(GetType).str.split(pat = '_', expand = True)
    GetNumber = GetSubString[1].str.extract('(\d+)')
    ParseOutString = GetNumber.iloc[0,0]
    WorkerCPUs = int(ParseOutString)
    nCPUs = nWorkers * WorkerCPUs
    return nCPUs
Bruno
  • 4,109
  • 1
  • 9
  • 27
Andrew P
  • 21
  • 2