23

I'm running some operations in PySpark, and recently increased the number of nodes in my configuration (which is on Amazon EMR). However, even though I tripled the number of nodes (from 4 to 12), performance seems not to have changed. As such, I'd like to see if the new nodes are visible to Spark.

I'm calling the following function:

sc.defaultParallelism
>>>> 2

But I think this is telling me the total number of tasks distributed to each node, not the total number of nodes that Spark can see.

How do I go about seeing the amount of nodes that PySpark is using in my cluster?

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
Bryan
  • 5,999
  • 9
  • 29
  • 50

5 Answers5

28

On pyspark you could still call the scala getExecutorMemoryStatus API using pyspark's py4j bridge:

sc._jsc.sc().getExecutorMemoryStatus().size()
Praveen Vinny
  • 2,372
  • 6
  • 32
  • 40
Nic
  • 456
  • 4
  • 6
  • 4
    For some reason, that doesn't seem to work for me. I posted a [question](https://stackoverflow.com/questions/51342460) with a minimal example, including the output (I get 1 from this call, when in fact there are 12 executors/workers). – et_l Jul 14 '18 at 19:50
19

sc.defaultParallelism is just a hint. Depending on the configuration it may not have a relation to the number of nodes. This is the number of partitions if you use an operation that takes a partition count argument but you don't provide it. For example sc.parallelize will make a new RDD from a list. You can tell it how many partitions to create in the RDD with the second argument. But the default value for this argument is sc.defaultParallelism.

You can get the number of executors with sc.getExecutorMemoryStatus in the Scala API, but this is not exposed in the Python API.

In general the recommendation is to have around 4 times as many partitions in an RDD as you have executors. This is a good tip, because if there is variance in how much time the tasks take this will even it out. Some executors will process 5 faster tasks while others process 3 slower tasks, for example.

You don't need to be very accurate with this. If you have a rough idea, you can go with an estimate. Like if you know you have less than 200 CPUs, you can say 500 partitions will be fine.

So try to create RDDs with this number of partitions:

rdd = sc.parallelize(data, 500)     # If distributing local data.
rdd = sc.textFile('file.csv', 500)  # If loading data from a file.

Or repartition the RDD before the computation if you don't control the creation of the RDD:

rdd = rdd.repartition(500)

You can check the number of partitions in an RDD with rdd.getNumPartitions().

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Thanks. However, when I run `sc.getExecutorMemoryStatus` I get an error saying `'SparkContext' object has no attribute 'getExecutorMemoryStatus'`. Are you using PySpark? – Bryan Mar 02 '15 at 15:30
  • No, I'm using the Scala API. I assumed it would also be part of the Python API, but seems like it's not. I guess your options are to add it to the Python API yourself or switch to the Scala API. Or take the number of machines as a command line argument. – Daniel Darabos Mar 02 '15 at 15:44
  • What do you mean by take the number of machines as a command line argument? Can you invoke this when you open the PySpark interactive shell? I'm a Python guy (so no Scala) and don't want to start making API changes. – Bryan Mar 02 '15 at 16:07
  • The simplest option is to not worry about the number of machines. If you just always make 500 partitions, you will be fine no matter if there are 2 machines or 20. – Daniel Darabos Mar 02 '15 at 16:16
  • I'd like to use all the machines and cores in my cluster. I'm invoking `MASTER=yarn-client /home/hadoop/spark/bin/pyspark --executor-cores 4 --num-executors 12` from the command line. Last, I'm running my operations on RDDs via sc.textFile() -- should I use `parallelize` on them as well? – Bryan Mar 02 '15 at 16:24
  • 2
    You set the number of partitions for an RDD when you create it. As long as you have more partitions than number of executor cores, all the executors will have something to work on. So the exact count is not that important. You can use `rdd.getNumPartitions()` to see the number of partitions in an RDD. Or use `rdd.repartition(n)` to change the number of partitions (this is a shuffle operation). – Daniel Darabos Mar 02 '15 at 16:28
  • Thanks. One last question -- If I execute `rdd.repartition(500)`, `rdd.getNumPartitions()` still returns 1. Any idea why? – Bryan Mar 02 '15 at 16:53
  • RDDs are immutable. If you execute `rdd = rdd.repartition(500)` then `rdd.getNumPartitions()` should return 500. – Daniel Darabos Mar 02 '15 at 17:40
  • I've updated the answer according to our discussion. – Daniel Darabos Mar 02 '15 at 21:34
  • 2
    This answer doesn't really answer the question and you CAN access `getExecutorMemoryStatus` via pyspark. – Danny Varod Jan 10 '21 at 16:10
  • Agreed, it would be nice if Nic's answer could be accepted instead of mine. – Daniel Darabos Jan 11 '21 at 17:56
6

It should be possible to get the number of nodes in the cluster using this (similar to @Dan's method above, but shorter and works better!).

sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
Charles Newey
  • 345
  • 1
  • 3
  • 10
3

The other answers provide a way to get the number of executors. Here is a way to get the number of nodes. This includes head and worker nodes.

s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")

d = set()
for i in l:
    d.add(i.split(":")[0])
len(d)  
Dan Ciborowski - MSFT
  • 6,807
  • 10
  • 53
  • 88
1

I found sometimes my sessions were killed by the remote giving a strange Java error

Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

I avoided this by the following

def check_alive(spark_conn):
    """Check if connection is alive. ``True`` if alive, ``False`` if not"""
    try:
        get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
        return True
    except Exception:
        return False

def get_number_of_executors(spark_conn):
    if not check_alive(spark_conn):
        raise Exception('Unexpected Error: Spark Session has been killed')
    try:
        return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
    except:
        raise Exception('Unknown error')
Dan Ciborowski - MSFT
  • 6,807
  • 10
  • 53
  • 88
Alexander McFarlane
  • 10,643
  • 9
  • 59
  • 100