0

I would like to measure the performance of PySpark on a simulation of Pi on my local desktop on Windows

I am using python 3.5

I notice that if I use numpy inside the function launched by spark worker the mean time of this function depends of the number of core whereas it is constant without numpy

I am using jupyter to launch my test

First I import my packages

import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
import random
import pandas as pd
from IPython.display import display, HTML
import time
import pandas as pd
import numpy as np
from itertools import product
import matplotlib.pyplot as plt

definition of my numpy function which evaluates Pi

def pi_eval_np(nb_evaluation):
   start = time.time()
   b = 1000000
   np.random.seed(1)
   r = np.random.rand(b, 2)
   x = r[:,0]
   y = r[:,1]
   idx = np.sqrt(x**2 + y**2) < 1
   pi = (np.sum(idx).astype('double')/b*4)
   stop = time.time()
   return pi, stop - start

Generic function to launch spark

def spark_evaluator(nb_cores, size_range, lambda_function):

    dict_output = {}
    dict_output["number of cores"] = nb_cores
    dict_output["size of range"] = size_range
    # Spark configuration
    conf = (SparkConf()
         .setMaster("local[" + str(nb_cores) + "]")
         .setAppName("spark eval")
         .set("spark.executor.memory", "4g"))

    # Generate spark Context
    start = time.time()
    sc = SparkContext(conf = conf)
    stop = time.time()

    dict_output["spark Context"] = stop - start

    #load array onto Spark workers
    start = time.time()
    chunks = sc.parallelize(range(0, size_range))
    stop = time.time()

    dict_output["spark parallelize"] = stop - start

    #map evaluation
    #Warm-up
    output = chunks.map(lambda_function).collect()

    start = time.time()
    output = chunks.map(lambda_function).collect()
    stop = time.time()

    dict_output["total evaluation"] = stop - start
    dict_output["mean lambda function"] = np.mean([x[1] for x in output])

    # stop spark Context
    sc.stop()


    return dict_output

Then I can run my evaluator on with several kind of configurations:

#Scenario 
cores_scenarii = [1, 2, 4, 8]
range_scenarii = [1000]

#run all scenarii
result_pi_np = []
for core, size_range in product(cores_scenarii, range_scenarii):
    result_pi_np.append(spark_evaluator(nb_cores = core, size_range = size_range, lambda_function = pi_eval_np))

#transform list of dict to dict of list
output_pi_scenarii_np = pd.DataFrame(result_pi_np).to_dict('list')

#output in pandas
display(pd.DataFrame(output_pi_scenarii_np))

Output is:

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>mean lambda function</th>
      <th>number of cores</th>
      <th>size of range</th>
      <th>spark Context</th>
      <th>spark parallelize</th>
      <th>total evaluation</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>0.067703</td>
      <td>1</td>
      <td>1000</td>
      <td>0.116012</td>
      <td>0.005000</td>
      <td>70.474194</td>
    </tr>
    <tr>
      <th>1</th>
      <td>0.063604</td>
      <td>2</td>
      <td>1000</td>
      <td>0.135212</td>
      <td>0.015601</td>
      <td>34.119039</td>
    </tr>
    <tr>
      <th>2</th>
      <td>0.065864</td>
      <td>4</td>
      <td>1000</td>
      <td>0.143411</td>
      <td>0.001000</td>
      <td>20.587668</td>
    </tr>
    <tr>
      <th>3</th>
      <td>0.081089</td>
      <td>8</td>
      <td>1000</td>
      <td>0.134608</td>
      <td>0.005001</td>
      <td>18.336296</td>
    </tr>
  </tbody>
</table>

As you can notice the mean lambda function is not constant when I increase the number of cores (ok with 1,2,4 cores problem with 8 cores) (I have a timer in the pi function in order to compute the average process time at the end of the run)

Whereas if I use a standard function without numpy the time is constant like that for example:

def pi_eval(void):
   start = time.time()
   b = 10000
   in_circle = 0
   for i in range(0, b):
      x, y = random.random(), random.random()
      r = (x*x + y*y) < 1
      in_circle += 1 if r else 0

   pi = float(in_circle)/float(b)*4
   stop = time.time()
   return pi, stop - start

Do you have any idea why numpy introduce such overhead ? Do you think that it can be due to the fact that we need to load a C dll by process ? if Yes why ?

parisjohn
  • 301
  • 2
  • 12
  • Possible duplicate of [Spark: Inconsistent performance number in scaling number of cores](https://stackoverflow.com/questions/41090127/spark-inconsistent-performance-number-in-scaling-number-of-cores) – Alper t. Turker Jun 05 '18 at 19:53
  • It is an another problem : scaling works here but it is not very good due to numpy, without numpy I have a perfect scaling.I don’t understand why with numpy the evaluate function launches by sparks has not a constant time, I do not talk here about total evaluation time of the application – parisjohn Jun 05 '18 at 20:22

0 Answers0