0

I have a spark.sql.Dataframe with various columns, and I perform different transformation steps on its columns. Only at the end there is an action (write to parquet).

I have been noticing that Spark gets slower and slower, while the "stack" of transforms builds up. Eventually I also run into an OutOfMemoryError.

To circumvent this I have tried:

  • persist: a persist() in between the transformation steps did not help.
  • temporary parquet files: curiously this DID work: by writing a parquet after a number of transformations, deling the df and reading the parquet files up again, the speed was back to "normal"

Thus my question(s):

  • Am I doing something wrong?
  • Has anybody encountered similar problems? How did you solve it?
  • Is it "good practice" to save intermediary results (temporarily) to disk and load them up again (I thought not b/c executors can spill to disk if necessary)?

EDIT 2017-05-04

sorry, here is some code:

Example 1: value-based row filtering

from pyspark.sql import functions as F
df = df.where((F.col('col') > max_value ) | (F.col('col') < min_value ))

Example 2: dropping duplicates based on certain columns

list_of_cols = ['col1', 'col2', ...]
df = df.dropDuplicates(list_of_cols)

Example 3: replace values below threshold with 0

df = df.withColumn('col', F.when(condition, new_value).otherwise(F.col('col')))

Example 4: detect decreasing parts of a colum that should be strictly increasing

(a strictly increasing id column permits this check)

    id_win = (Window
              .partitionBy('group_col')
              .orderBy('id')
              )
    inc_win = (Window
                .partitionBy('group_col')
                .orderBy('strictly_inc_col')
                )

    bad_rows = (df
                 .select('group_col', 'id',
                         F.row_number().over(id_win).alias('order_id'),
                         F.row_number().over(inc_win).alias('order_inc'))
                 .where('order_inc < order_id')
                 .select('id')
                 .toDF('id')
                 )

    df = (df
          .join(bad_rows, 'id', 'left_outer')
          .where(bad_rows['id'].isNull())
          .select(df.columns)
          )

Example 5: Check for lots of missing values

see OutOfMemoryError Post for the code, as it is quite long...

EDIT 2017-05-18

I found out about Spark's pyspark.sql.DataFrame.checkpoint method. This is like persist but gets rid of the dataframe's lineage. Thus it helps to circumvent the above mentioned issues.

Community
  • 1
  • 1
akoeltringer
  • 1,671
  • 3
  • 19
  • 34

0 Answers0