1

I have a DataFrame containing 752 (id,date and 750 feature columns) columns and around 1.5 million rows and I need to apply cumulative sum on all 750 feature columns partition by id and order by date.

Below is the approach I am following currently:

# putting all 750 feature columns in a list
required_columns = ['ts_1','ts_2'....,'ts_750']

# defining window
sumwindow = Window.partitionBy('id').orderBy('date')

# Applying window to calculate cumulative of each individual feature column

for current_col in required_columns:
    new_col_name = "sum_{0}".format(current_col)
    df=df.withColumn(new_col_name,sum(col(current_col)).over(sumwindow))

# Saving the result into parquet file    
df.write.format('parquet').save(output_path)

I am getting below error while running this current approach

py4j.protocol.Py4JJavaError: An error occurred while calling o2428.save.
: java.lang.StackOverflowError

Please let me know alternate solution for the same. seems like cumulative sum is bit tricky with large amount of data. Please suggest any alternate approach or any spark configurations which I can tune to make it work.

pault
  • 41,343
  • 15
  • 107
  • 149
Shrashti
  • 140
  • 5
  • 13
  • Your current window won't do the cumulative sum- it will just be the sum for all rows. Have a look at [this post](https://stackoverflow.com/a/45946350/5858851) for an example of how to do cumulative sum. – pault Dec 18 '18 at 19:10
  • hey pault actually I want to group cumulative sum of every column by id (its not unique) . window is working fine I have tested with smaller dataset. it just giving me stackoverflow error with large dataset – Shrashti Dec 18 '18 at 19:13

1 Answers1

2

I expect you have the issue of too large of a lineage. Take a look at your explain plan after you re-assign the dataframe so many times.

The standard solution for this is to checkpoint your dataframe every so often to truncate the explain plan. This is sort of like caching but for the plan rather than the data and is often needed for iterative algorithms that modify dataframes.

Here is a nice pyspark explanation of caching and checkpointing

I suggest df.checkpoint() every 5-10 modifications to start with

Let us know how it goes

Michael West
  • 1,636
  • 16
  • 23
  • You might try some of these techniques to reduce time. Not sure if they fit your use case. https://stackoverflow.com/questions/33882894/sparksql-apply-aggregate-functions-to-a-list-of-column – Michael West Dec 19 '18 at 12:41