27

I need to use this parameter, so how can I get the number of workers? Like in Scala, I can call sc.getExecutorMemoryStatus to get the available number of workers. But in PySpark, it seems there's no API exposed to get this number.

marcospereira
  • 12,045
  • 3
  • 46
  • 52
American curl
  • 1,259
  • 2
  • 18
  • 21
  • 2
    I don't think this question is a duplicate of the other. I would like to know how many executors have become available to the driver, even before any rdds have been created, when running on Mesos. Pretty annoying, but I ended up parsing the ui: import pandas as pd df = pd.read_html("http://localhost:4040/executors")[1] len(df[df['Executor ID'] != 'driver']) – MarkNS Jan 04 '17 at 13:35
  • 2
    Quick answer, to get the number of cores: sc._jsc.sc().getExecutorMemoryStatus().size() – OronNavon Feb 25 '18 at 12:02
  • 1
    Voted to reopen, as the original question refers to EMR nodes and this to Spark executors. While this question answers the former, the former is less generic. P.S. The accepted answer here is plain and simply wrong - both in the result and in the assumptions. – Danny Varod Jan 10 '21 at 15:19

3 Answers3

41

In scala, getExecutorStorageStatus and getExecutorMemoryStatus both return the number of executors including driver. like below example snippet

/** Method that just returns the current active/registered executors
        * excluding the driver.
        * @param sc The spark context to retrieve registered executors.
        * @return a list of executors each in the form of host:port.
        */
       def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

But In python api it was not implemented

@DanielDarabos answer also confirms this.

The equivalent to this in python...

sc.getConf().get("spark.executor.instances")

Edit (python) :

%python
sc = spark._jsc.sc() 
n_workers =  len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(n_workers)

As Danny mentioned in the comment if you want to cross verify them you can use the below statements.

%python

sc = spark._jsc.sc() 

result1 = sc.getExecutorMemoryStatus().keys() # will print all the executors + driver available

result2 = len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(result1, end ='\n')
print(result2)

Example Result :

Set(10.172.249.9:46467)
0
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • Sorry for my late reply, but in my Pyspark, it showed "SparkContext object has no attribute getConf" – American curl Aug 01 '16 at 02:06
  • I mean in you try in python equivalent syntax. not as it is... – Ram Ghadiyaram Aug 01 '16 at 05:20
  • I think its like this logger.info(sparkContext.getConf.getAll.mkString("\n")) will print all parameters appied out of that you can find number of executors as well – Ram Ghadiyaram Aug 01 '16 at 14:15
  • 9
    The python syntax is `sc._conf.get('spark.executor.instances')`. It returns a string. – Vito Feb 03 '18 at 16:42
  • 6
    Configured != reality :p – Thomas Decaux Jul 25 '19 at 10:06
  • The actual number of executors may be lower than the configured amount, depending on available resources. That is why in Scala the above snippet is used and not the getConf(). – Danny Varod Jan 10 '21 at 14:17
  • 6
    Since this question is closed and this answer is wrong, I'll answer here: `sc = spark_session._jsc.sc()` `result1 = sc.getExecutorMemoryStatus().keys()` `result2 = [executor.host() for executor in sc.statusTracker().getExecutorInfos()]` – Danny Varod Jan 10 '21 at 15:20
  • Thanks Danny for your contribution. To be more precise, at least for running this on Databricks: `sc = spark._jsc.sc()` `n_workers = len([executor.host() for executor in sc.statusTracker().getExecutorInfos()]) - 1` – Chiel Mar 05 '21 at 17:32
  • 1
    @Chiel and Danny I editied/corrected the answer. Thanks – Ram Ghadiyaram Mar 05 '21 at 19:13
  • @Andrey : Thanks ... my bad its typo error. :-) – Ram Ghadiyaram Apr 26 '21 at 05:21
  • A quick note for Databricks there's also this key to check: `'spark.databricks.clusterUsageTags.clusterTargetWorkers'` – Robbie Capps Oct 04 '22 at 23:04
2

You can also get the number of executors by Spark REST API: https://spark.apache.org/docs/latest/monitoring.html#rest-api

You can check /applications/[app-id]/executors, which returns A list of all active executors for the given application.


PS: When spark.dynamicAllocation.enabled is true, spark.executor.instances may not equals to the current available executors, but this API always returns the correct value.

Hunger
  • 5,186
  • 5
  • 23
  • 29
0

I instantiated the SparkContext this way but no one of the solutions worked:

conf = SparkConf().setMaster(MASTER_CONNECTION_URL).setAppName('App name')
sc = SparkContext(conf=conf)

So I changed my code to instantiate the SparkContext with pyspark.sql.SparkSession and everything worked fine:

# Gets Spark context
conf = SparkConf().setMaster(MASTER_CONNECTION_URL).setAppName('App name')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

# Gets the number of workers
spark = SparkContext.getOrCreate(conf=conf)
sc2 = spark._jsc.sc()
number_of_workers = len([executor.host() for executor in
                sc2.statusTracker().getExecutorInfos()]) - 1  # Subtract 1 to discard the master
Genarito
  • 3,027
  • 5
  • 27
  • 53