I have further questions following this post: https://stackoverflow.com/a/39398750/5060792
I have a 4 node cluster (1 driver, 3 workers), where each worker has 16 cores and 62gb of ram, and the driver has 8 cores and 12gb of ram.
So following the partitioning "rule of thumb", partitions should be (number of worker nodes * number of executors per worker node * cores per executor) * 3 or 4. With dynamic allocation, I am not exactly sure how many executors are started in each node, but assuming 3 executors per worker node with 5 cores per executor, which would be: 3*3*5*4 = 180. So 180 partitions should be near optimal?
Given the reproducible code below (where df
is a 125,000 row dataframe with a String column 'text'). With dynamic allocation, spark puts the imported dataframe in one partition on my cluster.
The count()
of df
takes around 8 to 10 seconds before .repartition(180)
and 1 to 2 seconds after.
Whereas the .rdd
function addArrays
takes around 8 to 10 seconds before .repartition(180)
and 150 seconds after.
Note: I am stuck using spark 2.2.0, so the spark sql array functions are not available to me.
Running .repartition(1)
afterwards does not speed up addArrays
, it continues to take around 2.5 minutes. However, rebuilding the ngrams
df from scratch again, where spark puts everything in one partition, speeds it back up to only a few seconds.
In short: count()
gets faster, .rdd.map()
gets slower.
I can repeat these scenarios many times. Repartitioning before I apply any function or after does not change the timings any appreciable amount.
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
spark = (
SparkSession.builder.master('yarn').appName('test')
.config('spark.kryoserializer.buffer.max', '1g')
.config('spark.sql.cbo.enabled', True)
.config('spark.sql.cbo.joinReorder.enabled', True)
.config('spark.yarn.executor.memoryOverhead', '2g')
.config('spark.driver.maxResultSize', '2g')
.config("spark.port.maxRetries", 100)
.config('spark.dynamicAllocation.enabled', 'true')
.config('spark.dynamicAllocation.executorIdleTimeout', '60')
.config('spark.dynamicAllocation.maxExecutors', '56')
.config('spark.dynamicAllocation.minExecutors', '0')
.config('spark.dynamicAllocation.schedulerBacklogTimeout', '1')
.getOrCreate()
)
sc = spark.sparkContext
sc.defaultParallelism
## my defaultParallelism is 2
placeholder = (
r"Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
r"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
r"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
r"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
r"culpa qui officia deserunt mollit anim id est laborum."
)
df = (
spark.range(0, 250000, 1)
.withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
.withColumn('text', F.lit(placeholder))
.withColumn('text', F.expr("substring_index(text, ' ', rand1)" ))
.withColumn('text', F.split(F.col('text'), ' '))
.select('text')
)
## Saving and reloading puts into 1 partition on my cluster.
df.write.parquet("df.parquet", mode='overwrite')
df = spark.read.parquet("df.parquet")
!hdfs dfs -du -h
## 1.4 M 4.3 M df.parquet
ngram01 = NGram(n=1, inputCol="text", outputCol="ngrams01")
ngram02 = NGram(n=2, inputCol="text", outputCol="ngrams02")
ngram03 = NGram(n=3, inputCol="text", outputCol="ngrams03")
ngram04 = NGram(n=4, inputCol="text", outputCol="ngrams04")
ngram05 = NGram(n=5, inputCol="text", outputCol="ngrams05")
ngram_pipeline = (
Pipeline()
.setStages([ngram01, ngram02, ngram03, ngram04, ngram05])
)
ngrams = (
ngram_pipeline
.fit(df)
.transform(df)
)
'''RDD Function to combine single-ngram Arrays.'''
colsNotNGrams = [c for c in ngrams.columns if 'ngrams' not in c]
colsNotNGramsTpls = ['(row.{},)'.format(c) for c in ngrams.columns if 'ngrams' not in c]
rddColTupls = ' + '.join(colsNotNGramsTpls)
def addArrays(row):
return (
eval( rddColTupls )
+ (row.ngrams01 + row.ngrams02 + row.ngrams03,)
+ (row.ngrams01 + row.ngrams02 + row.ngrams03 + row.ngrams04 + row.ngrams05,)
)
''' timings before repartitioning '''
ngrams.rdd.getNumPartitions()
# output is 1
ngrams.count()
# takes 8 to 10 seconds
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## takes 8 to 10 seconds
''' timings after repartitioning '''
ngrams = ngrams.repartition(180)
ngrams.rdd.getNumPartitions()
# output is 180
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## now takes 2.5 minutes
## HOWEVER,
ngrams.count()
# now takes 1 to 2 seconds
''' timings after repartitioning again does not help '''
ngrams = ngrams.repartition(1)
ngrams.rdd.getNumPartitions()
# output is 1
ngrams2 = (
ngrams
.rdd.map(addArrays)
.toDF(colsNotNGrams + ['ngrams_1to3', 'ngrams_1to5'])
)
## still takes 2.5 minutes