0

I have further questions following this post: https://stackoverflow.com/a/39398750/5060792

I have a 4 node cluster (1 driver, 3 workers), where each worker has 16 cores and 62gb of ram, and the driver has 8 cores and 12gb of ram.

So following the partitioning "rule of thumb", partitions should be (number of worker nodes * number of executors per worker node * cores per executor) * 3 or 4. With dynamic allocation, I am not exactly sure how many executors are started in each node, but assuming 3 executors per worker node with 5 cores per executor, which would be: 3*3*5*4 = 180. So 180 partitions should be near optimal?

Given the reproducible code below (where df is a 125,000 row dataframe with a String column 'text'). With dynamic allocation, spark puts the imported dataframe in one partition on my cluster.

The count() of df takes around 8 to 10 seconds before .repartition(180) and 1 to 2 seconds after. Whereas the .rdd function addArrays takes around 8 to 10 seconds before .repartition(180) and 150 seconds after.
Note: I am stuck using spark 2.2.0, so the spark sql array functions are not available to me.

Running .repartition(1) afterwards does not speed up addArrays, it continues to take around 2.5 minutes. However, rebuilding the ngrams df from scratch again, where spark puts everything in one partition, speeds it back up to only a few seconds.

In short: count() gets faster, .rdd.map() gets slower.

I can repeat these scenarios many times. Repartitioning before I apply any function or after does not change the timings any appreciable amount.

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline

spark = (
    SparkSession.builder.master('yarn').appName('test')
    .config('spark.kryoserializer.buffer.max', '1g')
    .config('spark.sql.cbo.enabled', True)
    .config('spark.sql.cbo.joinReorder.enabled', True)
    .config('spark.yarn.executor.memoryOverhead', '2g')
    .config('spark.driver.maxResultSize', '2g')
    .config("spark.port.maxRetries", 100)
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.dynamicAllocation.executorIdleTimeout', '60')
    .config('spark.dynamicAllocation.maxExecutors', '56')
    .config('spark.dynamicAllocation.minExecutors', '0')
    .config('spark.dynamicAllocation.schedulerBacklogTimeout', '1')
    .getOrCreate()
)

sc = spark.sparkContext

sc.defaultParallelism
## my defaultParallelism is 2

placeholder = (
    r"Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
    r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
    r"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
    r"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
    r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
    r"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
    r"culpa qui officia deserunt mollit anim id est laborum."
)

df = (
    spark.range(0, 250000, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('text', F.lit(placeholder))
    .withColumn('text', F.expr("substring_index(text, ' ', rand1)" ))
    .withColumn('text', F.split(F.col('text'), ' '))
    .select('text')
)

## Saving and reloading puts into 1 partition on my cluster.
df.write.parquet("df.parquet", mode='overwrite')
df = spark.read.parquet("df.parquet")

!hdfs dfs -du -h
## 1.4 M    4.3 M    df.parquet

ngram01 = NGram(n=1, inputCol="text", outputCol="ngrams01")
ngram02 = NGram(n=2, inputCol="text", outputCol="ngrams02")
ngram03 = NGram(n=3, inputCol="text", outputCol="ngrams03")
ngram04 = NGram(n=4, inputCol="text", outputCol="ngrams04")
ngram05 = NGram(n=5, inputCol="text", outputCol="ngrams05")

ngram_pipeline = (
    Pipeline()
    .setStages([ngram01, ngram02, ngram03, ngram04, ngram05])
)

ngrams = (
    ngram_pipeline
    .fit(df)
    .transform(df)
)

'''RDD Function to combine single-ngram Arrays.'''
colsNotNGrams = [c for c in ngrams.columns if 'ngrams' not in c]
colsNotNGramsTpls = ['(row.{},)'.format(c) for c in ngrams.columns if 'ngrams' not in c]
rddColTupls = ' + '.join(colsNotNGramsTpls)

def addArrays(row):
    return (
        eval( rddColTupls )
        + (row.ngrams01 + row.ngrams02 + row.ngrams03,) 
        + (row.ngrams01 + row.ngrams02 + row.ngrams03 + row.ngrams04 + row.ngrams05,)
    ) 


''' timings before repartitioning '''
ngrams.rdd.getNumPartitions()
# output is 1

ngrams.count()
# takes 8 to 10 seconds

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## takes 8 to 10 seconds

''' timings after repartitioning '''
ngrams = ngrams.repartition(180)
ngrams.rdd.getNumPartitions()
# output is 180

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## now takes 2.5 minutes 

## HOWEVER,
ngrams.count()
# now takes 1 to 2 seconds

''' timings after repartitioning again does not help '''
ngrams = ngrams.repartition(1)
ngrams.rdd.getNumPartitions()
# output is 1

ngrams2 = (
    ngrams
    .rdd.map(addArrays)
    .toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## still takes 2.5 minutes 
Clay
  • 2,584
  • 1
  • 28
  • 63
  • This code is missing a data sample to be reproducible. – eliasah Apr 30 '19 at 12:02
  • @eliasah fixed. – Clay May 01 '19 at 00:49
  • perhaps caused by "[operations like map() cause the new RDD to forget the parent's partitioning information.](https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0)" ? – Clay May 05 '19 at 13:35
  • I can't reproduce your problem but there is one thing I'd like to point out. `repartition(1)` is not the way to go because that will pull all your data to the driver. In case your data fits on the driver alone, spark becomes and overkill. some reading material : https://stackoverflow.com/questions/41090127/spark-inconsistent-performance-number-in-scaling-number-of-cores – eliasah May 07 '19 at 09:39
  • Where is that documented? I understand that `repartition(1)` will put the data on one worker node, but not the driver. Are you sure you are not thinking of `collect()`? Perhaps this is a special case of [Spark-21782](https://issues.apache.org/jira/browse/SPARK-21782), where 1=2^0, also referred to [here](https://stackoverflow.com/a/48478878/5060792) – Clay May 07 '19 at 12:41

0 Answers0