0

I'm a newbie to spark. I am using python (pyspark) to write my program. I use groupByKey function to transform key-value pairs to key-(list of values) pairs. I am running spark on a 64 cores computer and I try to utlize all the 64 cores by starting the program using the following command.

spark-submit --master local[64] my_program.py

However, I notice that while executing the groupByKey function, only one core is being utilized. The data is quite big. So, why spark doesn't partition it into 64 partitions and do the reduction/grouping in 64 different cores?

Am I missing some important step for parallelization?

The relevant part of the code looks like this,

# Here input itself is a key-(list of values) pair. The mapPartitions
# function is used to return a key-value pair (variable x), from
# which another key-(list of values) pair is created (variable y)
x = input.mapPartitions(transFunc)
# x contains key value pair, such as [(k1, v1), (k1, v2), (k2, v3)]
y = x.groupByKey()
# y contains key-list of values pair such as [(k1, [v1, v2]), (k2, [v2])]
maasg
  • 37,100
  • 11
  • 88
  • 115
MetallicPriest
  • 29,191
  • 52
  • 200
  • 356
  • how are you loading your data? – maasg Nov 08 '14 at 11:42
  • @maasg: I use mapPartitions. After mapPartitions, the resulting data in say variable x is a key value pair, where key is a string and value is also a string. I then use groupByKey to form a key to (list of values) pair, where key is the same key as was in x, and list of values is the list of string values. – MetallicPriest Nov 08 '14 at 11:51
  • Could you add the code to the question? – maasg Nov 08 '14 at 11:56
  • @maasg: Have now added the code. – MetallicPriest Nov 08 '14 at 12:11
  • still missing the part where the data is loaded. – maasg Nov 08 '14 at 12:14
  • @maasg But does that even matter, if you know what the variable x contains? I have not shown the input part because its a bit complex. Is loading of data very important to decide parallelization? – MetallicPriest Nov 08 '14 at 12:19
  • The data source often determines the initial partitions of the RDD, which will be the basis of parallelization. What's your source of data? A single file, a db, HDFS ? – maasg Nov 08 '14 at 12:22
  • My source of data is a text file, but its format is not such as that you can simply just use map function. So, I use the mappartitions function, to return an RDD, which contains key-to-list of value pairs. Can it be that the mapPartitions funtion is bad for parallelization? I use a code similar to what bearitto suggested to this question of mine, http://stackoverflow.com/questions/26741714/how-does-the-pyspark-mappartitions-function-work – MetallicPriest Nov 08 '14 at 12:25

1 Answers1

1

The default parallelism level in Spark is dictated by the configuration option: spark.default.parallelism. The default values are: (* from the docs)

Local mode: number of cores on the local machine Mesos fine grained mode: 8 Others: total number of cores on all executor nodes or 2, whichever is larger

RDDs can be regrouped in more or less partitions using these operations:

rdd.repartition(partitions: Int) // redistributes the RDD into the given nr of partitions
rdd.coalesce(partitions:Int) // reduces the number of partitions of the RDD to the given nr

Operations that require an internal shuffle often take a numPartitions parameters to specify the number of target partitions. After such operation, the RDD will have that new number of partitions. Let me illustrate that with one example:

Given:

val rdd = sc.textFile("localFile")  // default nr of partitions. Let's say 2

Then:

val moreParallelRdd = rdd.repartition(64) // 64 partitions
val onePartitionRdd = moreParallelRdd.coalesce(1) // 1 partition
val sortedRdd = onePartitionRdd.sortBy(x=> sortSelector(x), numPartitions=10) // 10 partitions 
maasg
  • 37,100
  • 11
  • 88
  • 115