I'm going over a list of files on HDFS one by one, opening it as text and then saving back to HDFS, to another location. Data is being parsed, then part files are merged and saved to same name as original, with BZIP2 suffix. However, it's rather slow - takes ~3s for each file, and I have over 10,000 of them per folder. I need to go file by file because I'm unsure how to keep the file name information. I need name to be able to do a MD5 and "confirm" no information loss has happened.
Here's my code:
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*")
hdfs.globStatus(sourcePath).foreach( fileStatus => {
val fileName = fileStatus.getPath().getName()
val filePathName = fileStatus.getPath().toString
if (fileName.contains(".done")) {
/* open, then save compressed */
val myFile = sc.textFile(filePathName)
val compressedBasePath = "/destination/compressed/"
/* use tmp_ to store folder w/ parts in it */
val compressedPath = compressedBasePath + "tmp_/" + fileName
myFile.saveAsTextFile(compressedPath,
classOf[org.apache.hadoop.io.compress.BZip2Codec])
/* merge part* -> old_name.bzip */
FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs,
new Path(compressedBasePath + "/" + fileName + ".bzip2"),
true, hdfsConf, null)
myFile.unpersist()
}
})
Before I figured I need the tests, I used something like this:
val myFile = sc.textFile("/source/*20180801*")
myFile.saveAsTextFile(compressedPath,
classOf[org.apache.hadoop.io.compress.BZip2Codec])
But then I cannot do the renaming part, and I do need the names. Any ideas what I could do?
UPDATE:
Thanks to suggestions in comments, and this particular question, I was able to fix the issue using parallel collections. The only real change was import of import scala.collection.parallel.immutable.ParVector
and adding par
method call before doing the foreach
.
Full article about parallel collections: https://docs.scala-lang.org/overviews/parallel-collections/overview.html
Thx