2

I am facing an out of memory error when trying to persist a dataframe and I don't really understand why. I have a dataframe of roughly 20Gb with 2.5 millions rows and around 20 columns. After filtering this dataframe, I have 4 columns and 0.5 million rows.

Now my problem is that when I persist the filtered dataframe I get an out of memory error (exceeds 25.4Gb of 20 Gb physical memory used). I have tried persisting at different storage levels

df = spark.read.parquet(path) # 20 Gb
df_filter = df.select('a', 'b', 'c', 'd').where(df.a == something) # a few Gb
df_filter.persist(StorageLevel.MEMORY_AND_DISK) 
df_filter.count()

My cluster has 8 nodes with 30Gb of memory each.

Do you have any idea where that OOM could come from ?

confused_pandas
  • 336
  • 5
  • 15
  • Wild guess: is it possible the `df_filter` is initially just a view of df, but then internally `persist` calls a `.copy()` (why would it do that, I don't know, but it's still a possibility) which then causes your OOM? – GPhilo May 09 '19 at 09:47
  • same error without `persist` ? – Steven May 09 '19 at 10:05
  • Thank you for your answers. No, I actually don't get any error without ```persist```. – confused_pandas May 09 '19 at 10:08
  • Try df_filter.persist(StorageLevel.MEMORY_AND_DISK_SER).count() – Will May 09 '19 at 10:29
  • Do you have the same problem with `.persist(StorageLevel.DISK_ONLY)`? – Pablo López Gallego May 09 '19 at 10:29
  • Just tried both MEMORY_AND_DISK_SER and DISK_ONLY, I get the same error. One thing I notice is that one of my workers always uses half of its physical memory even when nothing is running(20Gb out of 40Gb, I increased memory from 30 to 52Gb). And it seems like it tries to persist only on that particular node. I'm pretty new to Spark so everything is not clear to me but ```persist``` should, ideally, try to distribute partitions evenly between workers, right? – confused_pandas May 09 '19 at 10:34
  • Try `org.apache.spark.sql.Dataset.repartition` to distribute your data across nodes before `persist`. Expensive operation, but better than Memory error. – C.S.Reddy Gadipally May 09 '19 at 13:36

1 Answers1

1

Just some suggestions to help identify root cause ...

You probably have either (or a combo) of ...

  1. skewed source data partition split sizes which is tough to deal with and cause garbage collection, OOM, etc. (these methods have helped me, but there may be better approaches per use case)
# to check num partitions
df_filter.rdd.getNumPartitions()

# to repartition (**does cause shuffle**) to increase parallelism and help with data skew
df_filter.repartition(...) # monitor/debug performance in spark ui after setting
  1. too little/too many executors/ram/cores set in config
# check via
spark.sparkContext.getConf().getAll()

# these are the ones you want to watch out for
'''
--num-executors
--executor-cores
--executor-memory
'''
  1. wide transformation shuffles size too little/too many => try general debug checks to view transformations that will be triggered when persisting + find their # of output partitions to disk
# debug directed acyclic graph [dag]
df_filter.explain() # also "babysit" in spark UI to examine performance of each node/partitions to get specs when you are persisting

# check output partitions if shuffle occurs
spark.conf.get("spark.sql.shuffle.partitions")
thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Thanks thePurplePython. It seem that playing with the number of partitions did the job. Especially using the rule of thumb ```NumPartitions = NumCPUs * 4``` as explained in this post https://stackoverflow.com/questions/39381041/determining-optimal-number-of-spark-partitions-based-on-workers-cores-and-dataf – confused_pandas May 10 '19 at 09:39
  • you're welcome ... thanks for sharing that post too. – thePurplePython May 10 '19 at 13:18