82

When using Scala in Spark, whenever I dump the results out using saveAsTextFile, it seems to split the output into multiple parts. I'm just passing a parameter(path) to it.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. Does the number of outputs correspond to the number of reducers it uses?
  2. Does this mean the output is compressed?
  3. I know I can combine the output together using bash, but is there an option to store the output in a single text file, without splitting?? I looked at the API docs, but it doesn't say much about this.
sfjac
  • 7,119
  • 5
  • 45
  • 69
user2773013
  • 3,102
  • 8
  • 38
  • 58
  • 2
    It's generally bad practice to only use one file in Big Data if that file is large. – samthebest Jun 24 '14 at 07:40
  • What's the best practice then if the output was, say, a sorted file? Keep it as a collection of files and make the many output file names be some sort of index (i.e. something like first file is named "aa", middle ones would be like "fg", last one "zzy")? – Rdesmond Jun 22 '16 at 23:04
  • It's often the case that an heavy spark job only generates a very small output (aggregation, kpis, popularities, ...) which is produced on hdfs, but most likely to be latter used by applications unrelated to big data. Cleaner and easier in this case to have a well named single file for transfers and consumption. – Xavier Guihot Feb 10 '18 at 06:56

9 Answers9

103

The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with

val arr = year.collect()

And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.

If you require the file to be saved with saveAsTextFile you can use coalesce(1,true).saveAsTextFile(). This basically means do the computation then coalesce to 1 partition. You can also use repartition(1) which is just a wrapper for coalesce with the shuffle argument set to true. Looking through the source of RDD.scala is how I figured most of this stuff out, you should take a look.

Community
  • 1
  • 1
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • 3
    how do you save an array as text file?? there is no saveAsTextFile function for an array. just for RDD. – user2773013 Jun 23 '14 at 23:40
  • 5
    @user2773013 well the approach for that would be `coalesce` or the `partition` approach I suggested, but there is really no point in storing on hdfs if it's only on 1 node which is why using collect is really the correct way to go – aaronman Jun 24 '14 at 00:19
  • Very useful answer.... Haven't seen partitionBy or coalesce in the tutorials Ive read... –  Nov 13 '16 at 16:59
44

For those working with a larger dataset:

  • rdd.collect() should not be used in this case as it will collect all data as an Array in the driver, which is the easiest way to get out of memory.

  • rdd.coalesce(1).saveAsTextFile() should also not be used as the parallelism of upstream stages will be lost to be performed on a single node, where data will be stored from.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() is the best simple option as it will keep the processing of upstream tasks parallel and then only perform the shuffle to one node (rdd.repartition(1).saveAsTextFile() is an exact synonym).

  • rdd.saveAsSingleTextFile() as provided bellow additionally allows one to store the rdd in a single file with a specific name while keeping the parallelism properties of rdd.coalesce(1, shuffle = true).saveAsTextFile().


Something that can be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") is that it actually produces a file whose path is path/to/file.txt/part-00000 and not path/to/file.txt.

The following solution rdd.saveAsSingleTextFile("path/to/file.txt") will actually produce a file whose path is path/to/file.txt:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

which can be used this way:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

This snippet:

  • First stores the rdd with rdd.saveAsTextFile("path/to/file.txt") in a temporary folder path/to/file.txt.tmp as if we didn't want to store data in one file (which keeps the processing of upstream tasks parallel)

  • And then only, using the hadoop file system api, we proceed with the merge (FileUtil.copyMerge()) of the different output files to create our final output single file path/to/file.txt.

Xavier Guihot
  • 54,987
  • 21
  • 291
  • 190
23

You could call coalesce(1) and then saveAsTextFile() - but it might be a bad idea if you have a lot of data. Separate files per split are generated just like in Hadoop in order to let separate mappers and reducers write to different files. Having a single output file is only a good idea if you have very little data, in which case you could do collect() as well, as @aaronman said.

David Arenburg
  • 91,361
  • 17
  • 137
  • 196
marekinfo
  • 1,434
  • 1
  • 12
  • 12
  • Nice didn't think of `coalesce` cleaner than messing around with the partitioner, that being said I still think if your goal is to get it to one file `collect` is probably the right way to do it – aaronman Jun 23 '14 at 19:45
  • 1
    this works. But, if you use coalesce, that means you are only using 1 reducer. Wouldn't this slow the process because only 1 reducer is used?? – user2773013 Jun 23 '14 at 23:45
  • 1
    Yes, but that is what you are asking for. Spark outputs one file per partition. On the othe hand, why do you care about the number of files? When reading files in spark you can just specify the parent directory and all of the partitions are read as a single RDD – David Jun 01 '16 at 20:46
  • 1
    Do not `coalesce(1)` please, [unless you know what you are doing](http://stackoverflow.com/questions/38986776/nullpointerexception-in-spark-rdd-map-when-submitted-as-a-spark-job/38986964#38986964). – gsamaras Sep 03 '16 at 03:11
4

As others have mentioned, you can collect or coalesce your data set to force Spark to produce a single file. But this also limits the number of Spark tasks that can work on your dataset in parallel. I prefer to let it create a hundred files in the output HDFS directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txt to extract the results into a single file in the local filesystem. This makes the most sense when your output is a relatively small report, of course.

Matt
  • 1,284
  • 14
  • 22
2

In Spark 1.6.1 the format is as shown below. It creates a single output file.It is best practice to use it if the output is small enough to handle.Basically what it does is that it returns a new RDD that is reduced into numPartitions partitions.If you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")
Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
2

You can call repartition() and follow this way:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

enter image description here

Bhaskar Das
  • 652
  • 1
  • 9
  • 28
1

You will be able to do it in the next version of Spark, in the current version 1.0.0 it's not possible unless you do it manually somehow, for example, like you mentioned, with a bash script call.

gprivitera
  • 933
  • 1
  • 8
  • 22
1

I also want to mention that the documentation clearly states that users should be careful when calling coalesce with a real small number of partitions . this can cause upstream partitions to inherit this number of partitions.

I would not recommend using coalesce(1) unless really required.

JavaPlanet
  • 83
  • 9
0

Here's my answer to output a single file. I just added coalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

Code:

year.coalesce(1).saveAsTextFile("year")