0

My program works like this:

  1. Read in a lot of files as dataframes. Among those files there is a group of about 60 files with 5k rows each, where I create a separate Dataframe for each of them, do some simple processing and then union them all into one dataframe which is used for further joins.
  2. I perform a number of joins and column calculations on a number of dataframes finally which finally results in a target dataframe.
  3. I save the target dataframe as a Parquet file.
  4. In the same spark application, I load that Parquet file and do some heavy aggregation followed by multiple self-joins on that dataframe.
  5. I save the second dataframe as another Parquet file.

The problem

If I have just one file instead of 60 in the group of files I mentioned above, everything works with driver having 8g memory. With 60 files, the first 3 steps work fine, but driver runs out of memory when preparing the second file. Things improve only when I increase the driver's memory to 20g.

The Question

Why is that? When calculating the second file I do not use Dataframes used to calculate the first file so their number and content should not really matter if the size of the first Parquet file remains constant, should it? Do those 60 dataframes get cached somehow and occupy driver's memory? I don't do any caching myself. I also never collect anything. I don't understand why 8g of memory would not be sufficient for Spark driver.

Tom
  • 139
  • 2
  • 15
  • Hi, I got the same problem. I don't use persist nor cache, it looks like parquet files stays in driver memory. Did you find a solution, without adding memory? – Rolintocour Jul 10 '18 at 13:10
  • Good to know I am not alone. No, I have not found any proper solution yet, I am afraid, other than increasing driver memory and reading in much fewer files. Luckily I was able to limit them in my case. But that is not really a good long-term solution (the number of files is going to grow with time anyway) and I still do not quite understand whether this is caused by some bug in Spark or its peculiar handling of memory and me not tuning it properly... – Tom Jul 10 '18 at 15:11

2 Answers2

0
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//you have to use serialization configuration if you are using MEMORY_AND_DISK_SER 


val rdd1 = sc.textFile("some data")
rdd1.persist(storageLevel.MEMORY_AND_DISK_SER)   // marks rdd as persist
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.persist(storageLevel.MEMORY_AND_DISK_SER)
rdd3.persist(storageLevel.MEMORY_AND_DISK_SER)

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()
rdd2.unpersist()
rdd3.unpersist()

For tuning your code follow this link

-1

Caching or persistence are optimisation techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages. These interim results as RDDs are thus kept in memory (default) or more solid storages like disk and/or replicated. RDDs can be cached using cache operation. They can also be persisted using persist operation. The difference between cache and persist operations is purely syntactic. cache is a synonym of persist or persist(MEMORY_ONLY), i.e. cache is merely persist with the default storage level MEMORY_ONLY.

refer to use of persist and unpersist

  • Sorry, that is not very useful or relevant to my problem. – Tom Jul 10 '18 at 15:06
  • U have lot of files to process, after few steps its going out of memory. you need to use memory management. use persist ur problem will be solved. @Tom – Ravi Anand Vicky Jul 11 '18 at 07:50
  • What is your reasoning behind it? I am not sure why persisting would solve my problem as it uses a lot of memory itself. In fact, when persisting the dataframe (MEMORY_AND_DISK_SER) instead of saving in step 3 (and then saving both dataframes at the end) I was getting OOM errors with much lower volumes of data. – Tom Jul 11 '18 at 13:06