For those working with a larger dataset:
rdd.collect()
should not be used in this case as it will collect all data as an Array
in the driver, which is the easiest way to get out of memory.
rdd.coalesce(1).saveAsTextFile()
should also not be used as the parallelism of upstream stages will be lost to be performed on a single node, where data will be stored from.
rdd.coalesce(1, shuffle = true).saveAsTextFile()
is the best simple option as it will keep the processing of upstream tasks parallel and then only perform the shuffle to one node (rdd.repartition(1).saveAsTextFile()
is an exact synonym).
rdd.saveAsSingleTextFile()
as provided bellow additionally allows one to store the rdd in a single file with a specific name while keeping the parallelism properties of rdd.coalesce(1, shuffle = true).saveAsTextFile()
.
Something that can be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
is that it actually produces a file whose path is path/to/file.txt/part-00000
and not path/to/file.txt
.
The following solution rdd.saveAsSingleTextFile("path/to/file.txt")
will actually produce a file whose path is path/to/file.txt
:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
// Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
which can be used this way:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
This snippet:
First stores the rdd with rdd.saveAsTextFile("path/to/file.txt")
in a temporary folder path/to/file.txt.tmp
as if we didn't want to store data in one file (which keeps the processing of upstream tasks parallel)
And then only, using the hadoop file system api, we proceed with the merge (FileUtil.copyMerge()
) of the different output files to create our final output single file path/to/file.txt
.