Note: This is not a question ask the difference between coalesce and repartition, there are many questions talk about this ,mine is different.
I have a pysaprk job
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions())
df1.write.parquet(output_path, mode='overwrite')
Default 200 partition would reqire large memory, so I change repartition to 1000.
The job detail on spark webui looked like:
As output is only 44M, I tried to use coalesce
to avoid too many little files slow down hdfs.
What I do was just adding .coalesce(20)
before .write.parquet(output_path, mode='overwrite')
:
df = spark.read.parquet(input_path)
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
...
return pdf
df = df.repartition(1000, 'store_id', 'product_id')
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
df1 = df1.withColumnRenamed('y', 'yhat')
print('Partition number: %s' % df.rdd.getNumPartitions()) # 1000 here
df1.coalesce(20).write.parquet(output_path, mode='overwrite')
Then spark webui showed:
It looks like only 20 task are running.
When repartion(1000) , the parallelism was depend by my vcores number, 36 here. And I could trace the progress intutively(progress bar size is 1000 ).
After coalesce(20) , the previous repartion(1000) lost function, parallelism down to 20 , lost intuition too.
And adding coalesce(20)
would cause whole job stucked and failed without notification .
change coalesce(20)
to repartition(20)
works, but according to document, coalesce(20)
is much more efficient and should not cause such problem .
I want higher parallelism, and only the result coalesce to 20 . What is the correct way ?