1

Note: This is not a question ask the difference between coalesce and repartition, there are many questions talk about this ,mine is different.

I have a pysaprk job

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())

df1.write.parquet(output_path, mode='overwrite')

Default 200 partition would reqire large memory, so I change repartition to 1000.

The job detail on spark webui looked like: enter image description here

As output is only 44M, I tried to use coalesce to avoid too many little files slow down hdfs. What I do was just adding .coalesce(20) before .write.parquet(output_path, mode='overwrite'):

df = spark.read.parquet(input_path)

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    ...
    return pdf

df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

df1 = df1.withColumnRenamed('y', 'yhat')

print('Partition number: %s' % df.rdd.getNumPartitions())  # 1000 here

df1.coalesce(20).write.parquet(output_path, mode='overwrite')

Then spark webui showed:

enter image description here

It looks like only 20 task are running.

When repartion(1000) , the parallelism was depend by my vcores number, 36 here. And I could trace the progress intutively(progress bar size is 1000 ). After coalesce(20) , the previous repartion(1000) lost function, parallelism down to 20 , lost intuition too. And adding coalesce(20) would cause whole job stucked and failed without notification .

change coalesce(20) to repartition(20) works, but according to document, coalesce(20) is much more efficient and should not cause such problem .

I want higher parallelism, and only the result coalesce to 20 . What is the correct way ?

Mithril
  • 12,947
  • 18
  • 102
  • 153

1 Answers1

6

coalesce is considered a narrow transformation by Spark optimizer so it will create a single WholeStageCodegen stage from your groupby to the output thus limiting your parallelism to 20.

repartition is a wide transformation (i.e. forces a shuffle), when you use it instead of coalesce if adds a new output stage but preserves the groupby-train parallelism.

repartition(20) is a very reasonable option in your use case (the shuffle is small so the cost is pretty low).

Another option is to explicitly prevent Spark optimizer from merging your predict and output stages, for example by using cache or persist before your coalesce:

# Your groupby code here

from pyspark.storagelevel import StorageLevel

df1.persist(StorageLevel.MEMORY_ONLY)\
   .coalesce(20)\
   .write.parquet(output_path, mode='overwrite')

Given your small output size, a MEMORY_ONLY persist + coalesce should be faster than a repartition but this doesn't hold when the output size grows

rluta
  • 6,717
  • 1
  • 19
  • 21
  • WOW! I never saw a article talked about that detailed difference bewteen `coalesce` and `repartition` , thank you for sharing ! – Mithril Sep 16 '19 at 09:24
  • I checked some aritcles talked about narrow and wide transformation, many of them said `coalesce` is wide usually, such as https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies . Would you mind to add a case for when `coalesce` became narrow ? – Mithril Sep 16 '19 at 09:39
  • 3
    AFAIK, the `coalesce` method on DataFrame/Dataset has always been narrow. You can look at the [source code](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2286): `coalesce` uses the Repartition operator with shuffle explicitely disabled while `repartition` has shuffle enabled. It's also quite explicitely documented in the [RDD implementation](https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L417) – rluta Sep 16 '19 at 10:54
  • Why does "using cache or persist before your coalesce prevent Spark optimizer from merging your predict and output stages"? I think there should be an action before the coalesce but I'm not sure – idan ahal Jul 18 '23 at 15:06