I would like to measure the performance of PySpark on a simulation of Pi on my local desktop on Windows
I am using python 3.5
I notice that if I use numpy inside the function launched by spark worker the mean time of this function depends of the number of core whereas it is constant without numpy
I am using jupyter to launch my test
First I import my packages
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
import random
import pandas as pd
from IPython.display import display, HTML
import time
import pandas as pd
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
definition of my numpy function which evaluates Pi
def pi_eval_np(nb_evaluation):
start = time.time()
b = 1000000
np.random.seed(1)
r = np.random.rand(b, 2)
x = r[:,0]
y = r[:,1]
idx = np.sqrt(x**2 + y**2) < 1
pi = (np.sum(idx).astype('double')/b*4)
stop = time.time()
return pi, stop - start
Generic function to launch spark
def spark_evaluator(nb_cores, size_range, lambda_function):
dict_output = {}
dict_output["number of cores"] = nb_cores
dict_output["size of range"] = size_range
# Spark configuration
conf = (SparkConf()
.setMaster("local[" + str(nb_cores) + "]")
.setAppName("spark eval")
.set("spark.executor.memory", "4g"))
# Generate spark Context
start = time.time()
sc = SparkContext(conf = conf)
stop = time.time()
dict_output["spark Context"] = stop - start
#load array onto Spark workers
start = time.time()
chunks = sc.parallelize(range(0, size_range))
stop = time.time()
dict_output["spark parallelize"] = stop - start
#map evaluation
#Warm-up
output = chunks.map(lambda_function).collect()
start = time.time()
output = chunks.map(lambda_function).collect()
stop = time.time()
dict_output["total evaluation"] = stop - start
dict_output["mean lambda function"] = np.mean([x[1] for x in output])
# stop spark Context
sc.stop()
return dict_output
Then I can run my evaluator on with several kind of configurations:
#Scenario
cores_scenarii = [1, 2, 4, 8]
range_scenarii = [1000]
#run all scenarii
result_pi_np = []
for core, size_range in product(cores_scenarii, range_scenarii):
result_pi_np.append(spark_evaluator(nb_cores = core, size_range = size_range, lambda_function = pi_eval_np))
#transform list of dict to dict of list
output_pi_scenarii_np = pd.DataFrame(result_pi_np).to_dict('list')
#output in pandas
display(pd.DataFrame(output_pi_scenarii_np))
Output is:
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th></th>
<th>mean lambda function</th>
<th>number of cores</th>
<th>size of range</th>
<th>spark Context</th>
<th>spark parallelize</th>
<th>total evaluation</th>
</tr>
</thead>
<tbody>
<tr>
<th>0</th>
<td>0.067703</td>
<td>1</td>
<td>1000</td>
<td>0.116012</td>
<td>0.005000</td>
<td>70.474194</td>
</tr>
<tr>
<th>1</th>
<td>0.063604</td>
<td>2</td>
<td>1000</td>
<td>0.135212</td>
<td>0.015601</td>
<td>34.119039</td>
</tr>
<tr>
<th>2</th>
<td>0.065864</td>
<td>4</td>
<td>1000</td>
<td>0.143411</td>
<td>0.001000</td>
<td>20.587668</td>
</tr>
<tr>
<th>3</th>
<td>0.081089</td>
<td>8</td>
<td>1000</td>
<td>0.134608</td>
<td>0.005001</td>
<td>18.336296</td>
</tr>
</tbody>
</table>
As you can notice the mean lambda function is not constant when I increase the number of cores (ok with 1,2,4 cores problem with 8 cores) (I have a timer in the pi function in order to compute the average process time at the end of the run)
Whereas if I use a standard function without numpy the time is constant like that for example:
def pi_eval(void):
start = time.time()
b = 10000
in_circle = 0
for i in range(0, b):
x, y = random.random(), random.random()
r = (x*x + y*y) < 1
in_circle += 1 if r else 0
pi = float(in_circle)/float(b)*4
stop = time.time()
return pi, stop - start
Do you have any idea why numpy introduce such overhead ? Do you think that it can be due to the fact that we need to load a C dll by process ? if Yes why ?