3

I am running Python Spark on local to run an example found on Spark website. I generated a random dataFrame to have a bigger sample to make performance tests.

I have set my SparkSession and SparkContext like this:

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("KMeansParallel") \
        .getOrCreate()
sc = spark.sparkContext

But the program does not seem to run on parallel processes as suggested here. I see on task manager that only 10-25% of processor is used which leads me to think that Python is stuck on one core (by GIL?).

What do I do wrong? I tried to change some parameters on SparkSession:

.config("spark.executor.instances", 7) \
.config("spark.executor.cores", 3) \
.config("spark.default.parallelism", 7) \
.config("spark.driver.memory", "15g") \

I am running with 16GB memory, 4 cores, 8 logical processors. I leave some ressources for OS as advised here (even if local might be different from YARN configuration).

Complete code :

from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import math
import time

def gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance):
    """
    Returns a dataframe with <nPoints> points generated randomly
    around <nGaussian> centers by a normal distribution
    N(<a center chosen randomly>, <gaussianVariance>)    
    """

    #Generating centers
    meanPointsNumpy = np.random.rand(nGaussian, 2)

    def geneRandomChoice(nGaussian, nPoints):
        for i in range(nPoints):
            yield (i, np.random.choice(nGaussian, 1))

    #Generating points in a numpy ndarray
    dataNumpy = np.array([
        [t[0],
        np.random.normal(loc = meanPointsNumpy[t[1],0], scale = math.sqrt(gaussianVariance)),
        np.random.normal(loc = meanPointsNumpy[t[1],1], scale = math.sqrt(gaussianVariance))]
        for t in geneRandomChoice(nGaussian, nPoints)
    ])

    #Converting ndarray to RDD then to dataFrame
    dataRDD = sc \
        .parallelize(dataNumpy) \
        .map(lambda x: Row(label = int(x[0]), features = Vectors.dense(x[1].astype(float), x[2].astype(float))))

    data = spark.createDataFrame(dataRDD)

    return data

def kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance):
    """
    Evaluates the clusters from the dataFrame created
    by the gaussianMixture function
    """

    dataset = gaussianMixture(sc, spark, nPoints, nGaussian, gaussianVariance)

    t1 = time.time()

    # Trains a k-means model.
    kmeans = KMeans().setK(nGaussian)#.setSeed(1)
    model = kmeans.fit(dataset)

    # Make predictions
    predictions = model.transform(dataset)

    # Evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    #print("Silhouette with squared euclidean distance = " + str(silhouette))

    return time.time() - t1

nPoints = 10000
nGaussian = 100
gaussianVariance = 0.1
nTests = 20

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("KMeansParallel") \
    .getOrCreate()

sc = spark.sparkContext

meanTime = 0
for i in range(nTests):
    res = kMeansParallel(sc, spark, nPoints, nGaussian, gaussianVariance)
    meanTime += res

meanTime /= nTests
print("Mean Time : " + str(meanTime))

spark.stop()
Baptiste Merliot
  • 841
  • 11
  • 24
  • 4
    Please provide your complete program. See what is the partition count – Chandan Ray Aug 10 '18 at 16:16
  • Yes sure! The partition count seems to be 7 with the getNumPartitions() function. I do not know if this is my previous settings `.config("spark.default.parallelism", 7)` that changed it thought (I commented that setting for this run). – Baptiste Merliot Aug 13 '18 at 07:41
  • Were you ever able to find a solution? – oliver Jul 08 '19 at 00:44
  • @oliver I'm not sure but if I remember correctly my partitions were completely skewed, having all the data on one partition and only a negligible number of data on the rest. Hence the non-parallelism. – Baptiste Merliot Jul 08 '19 at 17:17
  • The problem may be when building the RDD. A wild guess would be to look at what kind of partitioner I used when building the RDD and how it partitions the data. I cannot test it now thought since it was at my previous company. – Baptiste Merliot Jul 08 '19 at 17:22

1 Answers1

0

There's no problem with the GIL as spark runs multiple python instances as needed. One per executor when running distributed and one per core (since it all runs in the driver) when running locally.

Most probably the data size/partition count is too low

Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68