6

I'm having strange performance results when comparing the two APIs in pyspark 3.2.1 that provide ability to run pandas UDF on grouped results of Spark Dataframe:

First I run the following input generator code in local spark mode (Spark 3.2.1):

import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps

spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", True) \
    .getOrCreate()

ps.set_option("compute.default_index_type", "distributed")

spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
    .write.parquet('/tmp/sample_input', mode='overwrite')

Then I test the applyInPandas:

def getsum(pdf):
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
    df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, schema=output_schema) \
    .write.parquet('/tmp/schematest', mode='overwrite')

And the code executes under 30 seconds (on i7-9750H CPU)

Then, I try the new API and - while I really appreciate how nice the code looks like:

def getsum(pdf) -> ps.DataFrame["id": int, "group": int, "sum_in_group": int]:
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = ps.read_parquet(f'/tmp/sample_input')
df.groupby('group').apply(getsum) \
    .to_parquet('/tmp/schematest', mode='overwrite')

... every time the execution time is at least 1m 40s on the same CPU, so more than 3x slower for this simple operation.

I am aware that adding sum_in_group can be done way more efficient with no panadas involvement, but this is just to provide a small minimal example. Any other operations is also at least 3 times slower.

Do you know what would be the reason to this slowdown? Maybe I'm lacking some context parameter that would make these execute in the similar time?

Mariusz
  • 13,481
  • 3
  • 60
  • 64
  • 3
    We encountered the same thing when doing benchmarks for our project (Fugue) [here](https://fugue-tutorials.readthedocs.io/tutorials/appendix/fugue_not_pandas.html). Just look at the images on that page. You will notice PySpark Pandas is consistently slow across the board. I personally think this has to do with the index that they add on top of the Spark DataFrame for compatibility with the Pandas API. The index adds a lot of overhead in the distributed setting because it essentially keeps a global order, which is often unnecessary. – Kevin Kho Feb 14 '22 at 04:27
  • 1
    Thanks @kvnkho, your benchmark results are super useful. I also have a gut feeling that it's somehow related with index they on the top of Spark DF. However, if schema is set, the index should not be retained (see: https://github.com/apache/spark/blob/v3.2.1/python/pyspark/pandas/groupby.py#L1303). I'm forcing `distributed` index that should be almost instant, as it's based on `monotonically_increasing_id()` - unfortunately, the Spark execution plans are truncated, so it's hard to debug... – Mariusz Feb 14 '22 at 12:29
  • 1
    Using the `distributed` index already seems very good to me. I have no ideas what else you can tune. Totally understand what you are saying, it is pretty confusing that the performance gap is that big. – Kevin Kho Feb 14 '22 at 16:23

0 Answers0