2

I have created a 7 nodes cluster on dataproc (1 master and 6 executors. 3 primary executors and 3 secondary preemptible executors). I can see in the console the cluster is created corrected. I have all 6 ips and VM names. I am trying to test the cluster but it seems the code is not running on all the executors but just 2 at max. Following is the code I am using to check the number of executors that the code executed on:

import numpy as np
import socket
set(sc.parallelize(range(1,1000000)).map(lambda x : socket.gethostname()).collect())

output:

{'monsoon-testing-sw-543d', 'monsoon-testing-sw-p7w7'}

I have restarted the kernel many times but, though the executors change the number of executors on which the code is executed remains the same.

Can somebody help me understand what is going on here and why pyspark is not parallelizing my code to all the executors?

figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56

1 Answers1

1

You have many executer to work, but not enough data partitions to work on. You can add the parameter numSlices in the parallelize() method to define how many partitions should be created:

rdd = sc.parallelize(range(1,1000000), numSlices=12)

The number of partitions should at least equal or larger than the number of executors for optimal work distribution.

Btw: with rdd.getNumPartitions() you can get the number of partitions you have in your RDD.

Til Piffl
  • 548
  • 2
  • 12
  • You are right that I needed to increase the number of slices. However, I had to really increase them. With 10 and 100 I'm getting 1 and 2 executors respectively but with 100 slices I do get all 6 executors. – figs_and_nuts Dec 02 '21 at 09:17
  • Not sure. Maybe the values are still not evenly distributed over the partitions, even though they are in my test: `rdd = sc.parallelize(range(1,100000),10).mapPartitions(lambda n: [len(list(n))]); rdd.collect()`. Output: [9999, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000] – Til Piffl Dec 02 '21 at 09:55
  • Partitions would be created no doubt. But they are residing on the same executor node for some reason – figs_and_nuts Dec 02 '21 at 20:55
  • There are configuration, how many executors are actually used by Spark. Checkout this post: https://stackoverflow.com/a/56720485/5118843 I'm not sure how the dynamic allocation actually works, but probably Spark looks at the size of the input data and/or the number of partitions to decide how many executors to use. – Til Piffl Dec 03 '21 at 08:04