1

I'm going over a list of files on HDFS one by one, opening it as text and then saving back to HDFS, to another location. Data is being parsed, then part files are merged and saved to same name as original, with BZIP2 suffix. However, it's rather slow - takes ~3s for each file, and I have over 10,000 of them per folder. I need to go file by file because I'm unsure how to keep the file name information. I need name to be able to do a MD5 and "confirm" no information loss has happened.

Here's my code:

import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.broadcast 
import org.apache.spark.sql.types._ 
import org.apache.spark.{SparkConf, SparkContext} 

sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
               "org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*") 

hdfs.globStatus(sourcePath).foreach( fileStatus => {
  val fileName = fileStatus.getPath().getName()
  val filePathName = fileStatus.getPath().toString
  if (fileName.contains(".done")) {
    /* open, then save compressed */
    val myFile = sc.textFile(filePathName)
    val compressedBasePath = "/destination/compressed/"
    /* use tmp_ to store folder w/ parts in it */
    val compressedPath = compressedBasePath + "tmp_/" + fileName
    myFile.saveAsTextFile(compressedPath, 
                          classOf[org.apache.hadoop.io.compress.BZip2Codec])
    /* merge part* -> old_name.bzip */
    FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                       new Path(compressedBasePath + "/" + fileName + ".bzip2"), 
                       true, hdfsConf, null)
    myFile.unpersist()
  }
})

Before I figured I need the tests, I used something like this:

val myFile = sc.textFile("/source/*20180801*")
myFile.saveAsTextFile(compressedPath, 
                      classOf[org.apache.hadoop.io.compress.BZip2Codec])

But then I cannot do the renaming part, and I do need the names. Any ideas what I could do?

UPDATE: Thanks to suggestions in comments, and this particular question, I was able to fix the issue using parallel collections. The only real change was import of import scala.collection.parallel.immutable.ParVector and adding par method call before doing the foreach.

Full article about parallel collections: https://docs.scala-lang.org/overviews/parallel-collections/overview.html

Thx

hummingBird
  • 2,495
  • 3
  • 23
  • 43
  • Checking for loss of data. I did that 30 years ago. I that time not past us that we need to do that? Curious. – thebluephantom Aug 15 '18 at 21:09
  • I get it, but data is of significance... user records... and i just have to somehow give "proof" all is clear before I actually start removing it :) – hummingBird Aug 15 '18 at 21:20
  • 2
    Why not store in an RDD the list of files, repartition it and then convert logic to run as part of foreachPartition or mapPartitions? – thebluephantom Aug 15 '18 at 21:29
  • Its a little contrived and not quite in the spirit of Spark parallel processing. But will work. What about Scala program with Futures? Bzip2 thing ... – thebluephantom Aug 15 '18 at 21:56
  • Let us know how you go – thebluephantom Aug 16 '18 at 06:58
  • Sorry for the _silence time_, but I had to take my beauty sleep :)... I've been sitting on this for quite some time. I'm a bit puzzled on the approach you explained. Have some code samples? – hummingBird Aug 16 '18 at 08:04
  • Look at mapPartitions example, you make iterators and can execute things in parallel, by taking part of an RDD. As I said this is not a standard case. But you are now getting sequential file by file processing. Unless I am wrong on foreach – thebluephantom Aug 16 '18 at 08:11
  • Yes, and that's why it's slow as a snail uphill. Okay, I'll try. – hummingBird Aug 16 '18 at 08:15
  • 1
    https://stackoverflow.com/questions/38069239/parallelize-avoid-foreach-loop-in-spark – thebluephantom Aug 16 '18 at 08:15
  • This did the trick. Actually, I parallelized using parallel collections, as described in second answer and it worked like a charm. – hummingBird Aug 16 '18 at 09:36
  • Perhaps share the code for others. I am interested in contrasting as well. The RDD foreach would also work, but... I.e. answer your own question - with help from others admittedly. Right the.par looks interesting. I was thinking about Futures, looks quite easy. Speed up comparison is? – thebluephantom Aug 16 '18 at 09:44
  • I'm not at liberty of modifying yarn setup, so it was a blocker for me sort of. I also added an answer. – hummingBird Aug 16 '18 at 10:08
  • 1
    Good on you. Will help others – thebluephantom Aug 16 '18 at 10:25

1 Answers1

0

There were two potential solutions in the original question's comments:

TBH, I only tested second approach as it was a quicker one (with less warnings). Final solution required only minimal changes - import of appropriate lib and parallelization of the Array hdfs.globStatus(sourcePath) call was returning. Here's the final code, with comments removed and two comments added for easier spotting of changes.

import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.broadcast 
import org.apache.spark.sql.types._ 
import org.apache.spark.{SparkConf, SparkContext} 
import scala.collection.parallel.immutable.ParVector /* added */

sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
               "org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*") 

/* note the par method call below */
hdfs.globStatus(sourcePath).par.foreach( fileStatus => {
  val fileName = fileStatus.getPath().getName()
  val filePathName = fileStatus.getPath().toString
  if (fileName.contains(".done")) {
    val myFile = sc.textFile(filePathName)
    val compressedBasePath = "/destination/compressed/"
    val compressedPath = compressedBasePath + "tmp_/" + fileName
    myFile.saveAsTextFile(compressedPath, 
                          classOf[org.apache.hadoop.io.compress.BZip2Codec])
    FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                       new Path(compressedBasePath + "/" + 
                                fileName.replace(".done", ".done.bz2")), 
                       true, hdfsConf, null)
    myFile.unpersist()
  }
})
hummingBird
  • 2,495
  • 3
  • 23
  • 43
  • Only point to make is - without losing info in title - how did you see that? – thebluephantom Aug 16 '18 at 10:55
  • I'm not sure I follow - how did I see *what* – hummingBird Aug 16 '18 at 11:03
  • The title says: How to paralelize spark etl more w/out losing info. What does w/out losing info signify? may be I am missing something – thebluephantom Aug 16 '18 at 11:07
  • Oh ... OK... If you do a simple `val myFile = sc.textFile("/source/*20180801*"); myFile.saveAsTextFile(compressedPath, classOf[org.apache.hadoop.io.compress.BZip2Codec])`, the job will also be done, but you'll have part-00001, ... However, I'm then losing file names and ability to verify results. – hummingBird Aug 16 '18 at 11:37
  • I guessed that but semantically not correct in title, could cause confusion. I really like his question. We do most file i/o with LINUX and IMPALA stuff and SPARK for clear use cases. It actually is improving my thoughts that most things can be done in SPARK / SCALA. Although mapping business rules seem clearer in IMPALA QL. – thebluephantom Aug 16 '18 at 11:41
  • Found one more interesting thing :). IF you don't use .bz2 as a file extension, spark will not be able to read it correctly usin `sc.textFile(...)`. Updating the answer – hummingBird Aug 16 '18 at 12:59
  • Regarding your Impala QL comment... Unfortunately, I had to go through all this because these were just temp files for processing into Hive. However, this processing took a while and I have no more space :) – hummingBird Aug 16 '18 at 13:02
  • Interesting. Success – thebluephantom Aug 16 '18 at 13:20