6

I have a question regarding the time difference while filtering pandas and pyspark dataframes:

import time
import numpy as np
import pandas as pd
from random import shuffle

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))
list_filter = list(range(10000))
shuffle(list_filter)

# pandas is fast 
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0072

df_spark = spark.createDataFrame(df)

# pyspark is slow
t0 = time.time()
df_spark_filtered = df_spark[df_spark[0].isin(list_filter)]
print(time.time() - t0)
# 3.1232

If I increase the length of list_filter to 10000 then the execution times are 0.01353 and 17.6768 seconds. Pandas implementation of isin seems to be computationally efficient. Can you explain me why filtering of a pyspark dataframe is so slow and how can I perform such filtering fast?

Konstantin
  • 2,937
  • 10
  • 41
  • 58

2 Answers2

14

You need to use join in place of filter with isin clause to speedup the filter operation in pyspark:

import time
import numpy as np
import pandas as pd
from random import shuffle
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame(np.random.randint(1000000, size=400000).reshape(-1, 2))

df_spark = spark.createDataFrame(df)

list_filter = list(range(10000))
list_filter_df = spark.createDataFrame([[x] for x in list_filter], df_spark.columns[:1])
shuffle(list_filter)

# pandas is fast because everything in memory
t0 = time.time()
df_filtered = df[df[0].isin(list_filter)]
print(time.time() - t0)
# 0.0227580165863
# 0.0127580165863

# pyspark is slow because there is memory overhead, but broadcast make is mast compared to isin with lists
t0 = time.time()
df_spark_filtered = df_spark.join(F.broadcast(list_filter_df), df_spark.columns[:1])
print(time.time() - t0)
# 0.0571971035004
# 0.0471971035004
Rahul Gupta
  • 716
  • 7
  • 14
  • Thank you for your reply! I think that for a fair comparison we need to add `df_spark_filtered.collect()` and it'll be more like 2 seconds than 50 ms. – Konstantin Dec 12 '18 at 13:46
4

Spark is designed to be used with huge amounts of data. If the data fits in a pandas dataframe, pandas will allways be quicker. The thing is, for huge data, pandas will fail and spark will do the job (quicker than MapReduce, for example).

Spark is normally slower in those cases because it needs to develop a DAG of the operations to be performed, like a plan of the execution, trying to optimize it.

So, you should only consider using spark when the data is really big, otherwise use pandas, it will be quicker.

You can check this article and see comparisons between pandas and spark speed, and pandas is always quicker until the data is so big it fails.

Manrique
  • 2,083
  • 3
  • 15
  • 38
  • Thank you for your reply. Overhead is expected, but it's too substantial + pyspark `isin` scales very badly with `list_filter`. – Konstantin Dec 12 '18 at 11:55
  • 1
    try setting `spark.sql.shuffle.partitions` and `spark.default.parallelism` to 1, to match pandas. – Kushagra Verma Jun 19 '21 at 00:49
  • Consider however, that PySpark uses multi-threading by default, while Pandas is single-threaded, so PySpark may achieve speedup even with moderately sized datasets. – G. Cohen Apr 05 '22 at 18:43