3

I am trying to load a directory of files into a Spark RDD and I need to append the origin file name for each line.

I was not able to find a way to do this with a regular RDD using sc.textFile so I am now attempting to use the WholeTextFiles method to load each file with its file name.

I am using this code:

val lines = 
     sc.wholeTextFiles(logDir).flatMap{ case (filename, content) =>
         val hash = modFiles.md5(filename)
         content.split("\n")
         .filter(line =>
            <filter conditions>
         .map(line => line+hash)
     }

However this code gives me an Java out of heap memory error, I guess it is trying to load all of the files at once?

Is there a method to solve this by not using wholeTextFiles and/or is there a method to not load all the files at once using wholeTextFiles?

328d95
  • 31
  • 4
  • `val lines = sc.wholeTextFiles(logDir)`.`lines.first()` . Does this work fine without Java out of heap memory error? – WoodChopper Sep 29 '15 at 04:40
  • Yes that works, I guess that points to the flatMap. The files are not that large ~50MB - ~100MB. – 328d95 Sep 29 '15 at 05:23
  • How many job tasks at once you got? Try to set numPartitions, in order not to run them all at once. – DaunnC Sep 29 '15 at 06:00
  • @DaunnC Setting numPartitions doesn't work. I tried 2, 10, 50, 100, 500. The job fails on a shuffle request submitting 67 tasks. – 328d95 Sep 29 '15 at 07:23
  • A very similar question has a good solution here: http://stackoverflow.com/questions/29686573/spark-obtaining-file-name-in-rdds see the link to http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ – mattinbits Sep 29 '15 at 07:55
  • @mattinbits I saw that solution but I was wary of it because it would mean I would have to recompute the file name hash for every single line. Maybe I am approaching this incorrectly and I should just pre-compute the hash and rename the files, but in doing so I would lose information about the logs. Ideally I would just append the whole file name but currently they are ~80 characters long each and I need to join on that field later--the string comparison would be very slow. – 328d95 Sep 29 '15 at 08:21
  • Watch out for premature optimisation. Do the filenames have a large amount of identical characters at the start of string? Can you show an example of the filenames? – mattinbits Sep 29 '15 at 08:30
  • @mattinbits BAD-FILE-93811-2014-02-23T21-12-25.266-LOG-ROTATE-2014-02-23T21-12-23.705.nglog BAD-FILE-9814-2014-02-11T19-24-52.284-LOG-ROTATE-2014-02-11T19-24-49.598.nglog – 328d95 Sep 29 '15 at 08:37
  • 1
    Since the file names start to differ quite early, I believe the string comparisons will not necessarily be a big source of slow down. However if you are concerned, then you could separately get a list of the filenames, compute the hashes, then broadcast this as a map of filename -> hash and replace the filename with the hash that way. – mattinbits Sep 29 '15 at 09:17
  • @mattinbits Thanks, I will try the solution from the other question again. – 328d95 Sep 29 '15 at 10:11

1 Answers1

1

The solution is the application of the code on this page: http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD

  // Create the text file
  val text = sc.hadoopFile(logDir,
    classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)

  // Cast to a HadoopRDD
  val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
  val linesRaw = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
    // get file name hash - you need to define your own hash function
    val fileHash = hash(inputSplit.asInstanceOf[FileSplit].getPath.toString)
    // input split is in _1 and line is in _2
    iterator.map(splitAndLine => splitAndLine._2+fileHash)
  }

Using this code was a ~10% performance hit in comparison to using sc.textFile

328d95
  • 31
  • 4