I'm having strange performance results when comparing the two APIs in pyspark 3.2.1 that provide ability to run pandas UDF on grouped results of Spark Dataframe:
- df.groupBy().applyInPandas()
- ps_df.groupby().apply() - a new way of apply introduced in Pandas-API-on-Spark AKA Koalas
First I run the following input generator code in local spark mode (Spark 3.2.1):
import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", True) \
.getOrCreate()
ps.set_option("compute.default_index_type", "distributed")
spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
.write.parquet('/tmp/sample_input', mode='overwrite')
Then I test the applyInPandas
:
def getsum(pdf):
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, schema=output_schema) \
.write.parquet('/tmp/schematest', mode='overwrite')
And the code executes under 30 seconds (on i7-9750H CPU)
Then, I try the new API and - while I really appreciate how nice the code looks like:
def getsum(pdf) -> ps.DataFrame["id": int, "group": int, "sum_in_group": int]:
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = ps.read_parquet(f'/tmp/sample_input')
df.groupby('group').apply(getsum) \
.to_parquet('/tmp/schematest', mode='overwrite')
... every time the execution time is at least 1m 40s on the same CPU, so more than 3x slower for this simple operation.
I am aware that adding sum_in_group
can be done way more efficient with no panadas involvement, but this is just to provide a small minimal example. Any other operations is also at least 3 times slower.
Do you know what would be the reason to this slowdown? Maybe I'm lacking some context parameter that would make these execute in the similar time?