8

I want to use spark to read a large (51GB) XML file (on an external HDD) into a dataframe (using spark-xml plugin), do simple mapping / filtering, reordering it and then writing it back to disk, as a CSV file.

But I always get a java.lang.OutOfMemoryError: Java heap space no matter how I tweak this.

I want to understand why doesn't increasing the number of partitions stop the OOM error

Shouldn't it split the task into more parts so that each individual part is smaller and doesn't cause memory problems?

(Spark can't possibly be trying to stuff everything in memory and crashing if it doesn't fit, right??)

Things I've tried:

  • repartitioning/coalescing to (5,000 and 10,000 partitions) the dataframe when reading and when writing (initial value is 1,604)
  • using a smaller number of executors (6, 4, even with 2 executors I get OOM error!)
  • decrease the size of split files (default looks like it's 33MB)
  • give tons of RAM (all I have)
  • increase spark.memory.fraction to 0.8 (default is 0.6)
  • decrease spark.memory.storageFraction to 0.2 (default is 0.5)
  • set spark.default.parallelism to a 30 and 40 (default is 8 for me)
  • set spark.files.maxPartitionBytes to 64M (default is 128M)

All my code is here (notice i'm not caching anything):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),
    df("_Title").as("title"),
    df("_Body").as("body"),
  ).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)

NOTES

  • the input split are really small (33MB only), so why can't I have 8 threads each processing one split? it really shouldn't blow my memory (I've se

UPDATE I've written a shorter version of the code that just reads the file and then forEachPartition(println).

I get the same OOM error:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),
   df("_Title").as("title"),
   df("_Body").as("body"),
   df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id, title, body, tags) =>
      Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

P.S.: I'm using spark v 2.1.0. My machine has 8 cores and 16 GB ram.

ashawley
  • 4,195
  • 1
  • 27
  • 40
Felipe
  • 11,557
  • 7
  • 56
  • 103
  • Have you inspected the size of created partitions in Spark UI? – Khozzy May 05 '17 at 09:37
  • @Khozzy This is what I got when I ran the app with 1604 partitions for the read DF and 50 partitions for the DF to be written: [screenshot-spark-ui](http://i.imgur.com/a5LjEmc.png) – Felipe May 05 '17 at 16:30
  • Yes, but look into the UI during job execution. You will find how long each task is executed and how your CPU is utilized (maybe there are stragglers). – Khozzy May 05 '17 at 18:47
  • I can't get a single task to complete, see the UI just before the OOM error: [screenshot](http://i.imgur.com/O3u6Rhu.png).. Also, I noted that there was a large spike in disk IO just before it crashes. – Felipe May 05 '17 at 19:03
  • Actually, I do get some of the 8 initial tasks done, but when the 9th task starts, some of the tasks in the earlier group fail. – Felipe May 05 '17 at 19:23
  • @Felipe Almeida I am facing a similar issue. Did you manage to solve this memory issue ? – Ayat Khairy Aug 09 '17 at 11:18
  • did you find any soln to this problem? – Aman Tandon Aug 28 '18 at 06:43
  • @AmanTandon dont think so. I ended up doing something else. Can't remember, sorry. – Felipe Aug 31 '18 at 02:46

3 Answers3

1

I was getting this error when running spark-shell and hence I increased the driver memory to a high number. Then I was able to load the XML.

spark-shell --driver-memory 6G

Source: https://github.com/lintool/warcbase/issues/246#issuecomment-249272263

joydeep bhattacharjee
  • 1,249
  • 4
  • 16
  • 42
0

Because you are storing your RDD twice and Your logic must be change like this or filter with SparkSql

 val df: DataFrame = SparkFactory.spark.read
      .option("mode", "DROPMALFORMED")
      .format("com.databricks.spark.xml")
      .schema(customSchema) // defined previously
      .option("rowTag", "row")
      .load(s"$pathToInputXML")
      .coalesce(numPartitions)

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
    // prints 1604


    // regexes to clean the text
    val tagPat = "<[^>]+>".r
    val angularBracketsPat = "><|>|<"
    val whitespacePat = """\s+""".r

    // filter and select only the cols i'm interested in
     df
      .where( df.col("_TypeId") === "1" )
      .select(
        df("_Id").as("id"),
        df("_Title").as("title"),
        df("_Body").as("body"),
      ).as[Post]
      .map{
        case Post(id,title,body,tags) =>

          val body1 = tagPat.replaceAllIn(body,"")
          val body2 = whitespacePat.replaceAllIn(body1," ")

          Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

      }
      .orderBy(rand(SEED)) // random sort
      .write // write it back to disk
      .option("quoteAll", true)
      .mode(SaveMode.Overwrite)
      .csv(output)
Muhunthan
  • 413
  • 2
  • 5
  • 15
  • Making it all a single DF didn't really help.. I still have `java.lang.OutOfMemoryError: Java heap space` – Felipe May 05 '17 at 16:46
-2

You can change the heap size by adding the following in your environment variable:

  1. Environment Variable name : _JAVA_OPTIONS
  2. Environment Variable Value : -Xmx512M -Xms512M