I have a spark.sql.Dataframe with various columns, and I perform different transformation steps on its columns. Only at the end there is an action (write to parquet).
I have been noticing that Spark gets slower and slower, while the "stack" of transforms builds up. Eventually I also run into an OutOfMemoryError.
To circumvent this I have tried:
- persist: a
persist()
in between the transformation steps did not help. - temporary parquet files: curiously this DID work: by writing a parquet after a number of transformations,
del
ing thedf
and reading the parquet files up again, the speed was back to "normal"
Thus my question(s):
- Am I doing something wrong?
- Has anybody encountered similar problems? How did you solve it?
- Is it "good practice" to save intermediary results (temporarily) to disk and load them up again (I thought not b/c executors can spill to disk if necessary)?
EDIT 2017-05-04
sorry, here is some code:
Example 1: value-based row filtering
from pyspark.sql import functions as F
df = df.where((F.col('col') > max_value ) | (F.col('col') < min_value ))
Example 2: dropping duplicates based on certain columns
list_of_cols = ['col1', 'col2', ...]
df = df.dropDuplicates(list_of_cols)
Example 3: replace values below threshold with 0
df = df.withColumn('col', F.when(condition, new_value).otherwise(F.col('col')))
Example 4: detect decreasing parts of a colum that should be strictly increasing
(a strictly increasing id
column permits this check)
id_win = (Window
.partitionBy('group_col')
.orderBy('id')
)
inc_win = (Window
.partitionBy('group_col')
.orderBy('strictly_inc_col')
)
bad_rows = (df
.select('group_col', 'id',
F.row_number().over(id_win).alias('order_id'),
F.row_number().over(inc_win).alias('order_inc'))
.where('order_inc < order_id')
.select('id')
.toDF('id')
)
df = (df
.join(bad_rows, 'id', 'left_outer')
.where(bad_rows['id'].isNull())
.select(df.columns)
)
Example 5: Check for lots of missing values
see OutOfMemoryError Post for the code, as it is quite long...
EDIT 2017-05-18
I found out about Spark's pyspark.sql.DataFrame.checkpoint
method. This is like persist
but gets rid of the dataframe's lineage. Thus it helps to circumvent the above mentioned issues.