I am running Python Spark on local to run an example found on Spark website. I generated a random dataFrame to have a bigger sample to make performance tests.
I have set my SparkSession and SparkContext like this:
spark = SparkSession.builder \
.master("local[*]") \
.appName("KMeansParallel") \
.getOrCreate()
sc = spark.sparkContext
But the program does not seem to run on parallel processes as suggested here. I see on task manager that only 10-25% of processor is used which leads me to think that Python is stuck on one core (by GIL?).
What do I do wrong? I tried to change some parameters on SparkSession:
.config("spark.executor.instances", 7) \
.config("spark.executor.cores", 3) \
.config("spark.default.parallelism", 7) \
.config("spark.driver.memory", "15g") \
I am running with 16GB memory, 4 cores, 8 logical processors. I leave some ressources for OS as advised here (even if local might be different from YARN configuration).
Complete code :
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import math
import time
def gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance):
"""
Returns a dataframe with <nPoints> points generated randomly
around <nGaussian> centers by a normal distribution
N(<a center chosen randomly>, <gaussianVariance>)
"""
#Generating centers
meanPointsNumpy = np.random.rand(nGaussian, 2)
def geneRandomChoice(nGaussian, nPoints):
for i in range(nPoints):
yield (i, np.random.choice(nGaussian, 1))
#Generating points in a numpy ndarray
dataNumpy = np.array([
[t[0],
np.random.normal(loc = meanPointsNumpy[t[1],0], scale = math.sqrt(gaussianVariance)),
np.random.normal(loc = meanPointsNumpy[t[1],1], scale = math.sqrt(gaussianVariance))]
for t in geneRandomChoice(nGaussian, nPoints)
])
#Converting ndarray to RDD then to dataFrame
dataRDD = sc \
.parallelize(dataNumpy) \
.map(lambda x: Row(label = int(x[0]), features = Vectors.dense(x[1].astype(float), x[2].astype(float))))
data = spark.createDataFrame(dataRDD)
return data
def kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance):
"""
Evaluates the clusters from the dataFrame created
by the gaussianMixture function
"""
dataset = gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance)
t1 = time.time()
# Trains a k-means model.
kmeans = KMeans().setK(nGaussian)#.setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
#print("Silhouette with squared euclidean distance = " + str(silhouette))
return time.time() - t1
nPoints = 10000
nGaussian = 100
gaussianVariance = 0.1
nTests = 20
spark = SparkSession.builder \
.master("local[*]") \
.appName("KMeansParallel") \
.getOrCreate()
sc = spark.sparkContext
meanTime = 0
for i in range(nTests):
res = kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance)
meanTime += res
meanTime /= nTests
print("Mean Time : " + str(meanTime))
spark.stop()