0

My apologies if this question has already been answered. I did have a look at the archive but I did not find an answer specific to my question.

I am new to Spark. I am trying to run the simple example attached in parallel locally, using spark-2.1.1 in my MacOS Sierra machine. As I have 4 cores and there are 4 tasks each taking 10 seconds, I was hoping to spend in total a bit more than 10 seconds.

I see that each task takes the expected amount of time. But there seems to me only 2 thread of execution. I was expecting 4. As you can see in the code, the value of each tuple is the execution time of the corresponding task.

insight086:pyspark lquesada$ more output/part-00000

(u'1', 10.000892877578735)
(u'3', 10.000878095626831)

insight086:pyspark lquesada$ more output/part-00001

(u'2', 10.000869989395142)
(u'4', 10.000877857208252)

Also the total time this is taking is considerably more than 20 seconds:

total_time 33.2253439426

Thanks in advance for your help!

Cheers, Luis

INPUT FILE:

1
2
3
4

SCRIPT:

from pyspark import SparkContext
import time

def mymap(word):
    start = time.time()
    time.sleep(10)
    et=time.time()-start
    return (word, et)

def main():
    start = time.time()
    sc = SparkContext(appName='SparkWordCount')

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt')
    counts = input_file.flatMap(lambda line: line.split()) \
                     .map(mymap) \
                     .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output')

    sc.stop()
    print 'total_time',time.time()-start

if __name__ == '__main__':
   main()
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • This dataset is so small that it's impossible to prove anything with it... – eliasah May 29 '17 at 11:33
  • My actual question was on the number cores used. However, I do appreciated the fact you pointed me to "Inconsistent performance number in scaling number of cores" since it is certainly relevant to my concern regarding the overhead. – Luis Quesada May 29 '17 at 12:36

1 Answers1

0

That's why Divide and conquer algorithms have their threshold where it makes sense to use them at all. Add distribution to the mix (with parallelism) in Spark and you have quite a machinery to do such a smallish calculation. You are simply not leveraging Spark's strengths with this 4-element dataset.

It's assumed that with the larger and larger datasets the time will converge around your expectation.

Also, the number of partitions when reading local datasets is at most 2 so without repartitioning you use 2 cores only.

repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.


local[*] means to use as many cores as your computer has (see the case for LOCAL_N_REGEX in SparkContext):

def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
val threadCount = if (threads == "*") localCpuCount else threads.toInt

It's only a hint how many partitions to use by default, but does not prevent Spark to go up or down. It mainly depends on the optimizations Spark applies to end up with the best execution plan for your distributed computation. Spark does quite a few for you and the higher the abstraction level the more optimizations (see the batches in Spark SQL's Optimizer).

Community
  • 1
  • 1
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420