69

How can you write to multiple outputs dependent on the key using Spark in a single Job.

Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1 is

a
b

and cat prefix/2 would be

c

EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

Abel
  • 56,041
  • 24
  • 146
  • 247
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • Could you add an example that illustrates what you're after? Maybe an input collection and what you expect as process/output? – maasg Jun 02 '14 at 14:28
  • No, "done" as in completed maasg's request @fengyun. We have a use case for this, in particular currently we have a Scalding job that uses `TemplatedTsv` which we want to replace with a Spark job. But the refactor has been sitting in our backlog ... when I finally write out a full hdfs based solution I'll be sure to post it here. – samthebest Aug 06 '14 at 07:51
  • Does [this example](https://sites.google.com/site/hadoopandhive/home/how-to-write-output-to-multiple-named-files-in-hadoop-using-multipletextoutputformat) help at all? I'm trying to figure out how to do this myself. – Nick Chammas Sep 12 '14 at 22:37
  • 5
    FYI: I've opened [SPARK-3533](https://issues.apache.org/jira/browse/SPARK-3533) to request that a more straightforward way of doing this be added to Spark. – Nick Chammas Sep 15 '14 at 18:31

10 Answers10

127

If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy(), which we need, was introduced in 1.4.)

If you're starting out with an RDD, you'll first need to convert it to a DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

In Python, this same code is:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:

people_df.write.partitionBy("number").text("people")

And you can easily use other output formats if you want:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
  • Can you add the equivalent `Dataset`s code in Scala? and I'll accept as best answer. Yes some people don't care about types and like running their entire application every few minutes to find out if they have any bugs, but some of us like to catch typos like 'nubmer' the instant we typed it :) Seriously though, good answer. – samthebest May 12 '16 at 17:28
  • @samthebest - Done. In Spark 2.0, `DataFrame` will simply be an alias for `Dataset[Row]`, so I think my Scala example should hold the same. – Nick Chammas May 12 '16 at 22:08
  • Yeah, looks like there isn't a 100% typesafe way to do it, but then the original accepted answer wasn't either. I imagine in Spark 2.0 there will be a purely type safe way to do it (i.e. passing a function into `partitionBy` rather than a string). Anyway, just tweaked a couple things and accepted. – samthebest May 14 '16 at 11:22
  • 77
    @samthebest - Just FYI, I rolled back your edit because it had a few problems: It didn't fit my style of writing; I don't know much about Datasets, so the note about `Dataset[SomeCaseClass]` is more appropriate as a comment; finally, Python doesn't have a `makeRDD()` method. – Nick Chammas May 14 '16 at 16:23
  • 9
    Note that if you had `Dataset[SomeCaseClass]` then you can just call `.toDF()` and the column labels will match up to `SomeCaseClass`es fields. This gives a little more type safety. – samthebest May 18 '16 at 14:10
  • 1
    Is there any way to force this method to write only one file/part per partition? – moustachio May 21 '16 at 18:06
  • 3
    @moustachio - Good question. I think you can force that by coalescing the DataFrame into one partition before the `partitionBy()`. For example: `people_df.coalesce(1).write.partitionBy("number").text("people")` This may limit Spark's parallelism when writing out the data, though, depending on your data and cluster configuration. – Nick Chammas May 23 '16 at 01:25
  • Is there an easy way to read these partitioned RDDs back into another spark job? Something with ```wholeTextFiles```? – rsmith54 Jul 11 '17 at 15:21
  • @rsmith54 - You can read each partition into a DataFrame (remember, these are DataFrames, not plain RDDs) and then union all the DataFrames to get the original DataFrame back. I dunno if there is an easier way to do it, but if there is it probably does not involve `wholeTextFiles()`, which is meant for small, unstructured files. – Nick Chammas Jul 11 '17 at 17:34
  • If `people` already exists then the save will fail, right? What if you want to add more entires to the `people` directory/prefix? – volni Jun 05 '18 at 01:43
  • @volni - Yes, the save will fail if the destination path already exists, unless you pass `mode="overwrite"` as an option. To append new entries to an existing directory, you can try `mode="append"`. You can pass this either as an argument to the various methods like `json()`, `parquet()`, etc. or as [top-level write option](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.mode). – Nick Chammas Jun 12 '18 at 14:14
82

I would do it like this which is scalable

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

CubeJockey
  • 2,209
  • 8
  • 24
  • 31
zhang zhan
  • 1,596
  • 13
  • 10
  • 2
    Please could you add all necessary import statements? I haven't tested this, but accepting the answer as it appears to be what I want. What is the point in `partitionBy(new Hashpartitioner(num))`?? isn't this the same as `repartition(num)`?? – samthebest Sep 27 '14 at 12:48
  • 4
    It is different. hash partition will ensure that all records with the same key to go to the same partition. As I remembered repartition does not have this functionality. – zhang zhan Sep 28 '14 at 00:57
  • Thanks a lot for this very good solution. I was just wondering the following: how should I modify your code in order to have the output on each file sorted on the values v? – Yiannis Gkoufas Jan 31 '15 at 20:31
  • 3
    I was looking for writing multiple parquet outputs, and this a solution along these lines looks promising (only subclassing MultipleOutputFormat directly, not using MultipleTextOutputFormat). Unfortunately MutlipleOutputFormat only exists in old API MR1/mapred, whereas the AvroParquetOutputFormat and ParquetOutputFormat (supporting parquet) are written against the new API MR2/mapreduce, so it seems the same path is not open... – Silas Davis Jul 03 '15 at 15:45
  • @zhangzhan This would create a single file for every key. In my case there could be potentially millions of lines for a key in which case I would ideally like a directory with output parts inside it. How does this solution enable that? – Sohaib Jul 04 '15 at 06:04
  • @zhangzhan I removed the hash partitioner as I don't really want to limit my partitions (number of keys is small. No of values in each key is large) and added the following in the RDD Multiout `key.asInstanceOf[String] + Path.SEPARATOR + name`. I do not know about the performance hits thought. – Sohaib Jul 04 '15 at 06:41
  • Guys, I am getting this error (posted by someone else): http://stackoverflow.com/questions/25996822/spark-scala-saveashadoopfile-throwing-error any suggestions? – zengr Nov 14 '15 at 00:23
  • 3
    Looks great! Is there a python equivalent? – NDavis Feb 24 '16 at 00:03
  • what is the different by removed the .partitionBy(new HashPartitioner(num))? – Linlin Apr 12 '16 at 15:06
  • @NDavis - There is a different approach to accomplishing this that is available in Python. See [my answer](http://stackoverflow.com/a/37150604/877069). – Nick Chammas May 11 '16 at 00:23
  • @zhangzhan Hey, I am getting following error while using this : java.lang.RuntimeException: java.lang.NoSuchMethodException : $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$RDDMultipleTextOutputFormat.() – aks Jul 17 '17 at 08:12
16

If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Replace PrintWriter with your choice of distributed filesystem operation.)

This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.

samthebest
  • 30,803
  • 25
  • 102
  • 142
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Thanks. If we where to use HDFS instead of local filesystem as we will essentially be implementing the shuffle part by hand ourselves right? Also, what happens when multiple partitions contain pairs that have the same key? Both tasks may try to write to the same file, and therefore we need some kind of synchronized file management system to keep track of creating part-XXXXX. I'm afraid this solution feels very dirty given that I'm sure a solution using `MultipleOutputFormat` exists. – samthebest Jun 21 '14 at 16:02
  • You are right that it is kind of implementing shuffle. But there is no bottleneck, I think. There is no single node which is receiving all records with a key. There is no problem with the same key coming from multiple partition, and there is no need for synchronization either. The file name is `output//`. So each partition writes to different files. (The partition index goes to the `suffix` in the example.) – Daniel Darabos Jun 21 '14 at 17:33
  • `MultipleOutputFormat` sounds perfect for the job, and would work by the same idea. I've just never used it. I think you would just rewrite my `MultiWriter` to use `MultipleOutputFormat` instead of rolling its own key->file mapping. But the `mapPartitionsWithIndex` bit would be mostly unchanged. – Daniel Darabos Jun 21 '14 at 17:36
  • Sorry, I misunderstood your solution (tbh I skim read). Thanks for the clarification. Yes I think with some playing around and replacing the writer code with HDFS this would work (and no bottleneck either). Thanks for your answer. – samthebest Jun 22 '14 at 12:38
  • I'm concerned that when we use `mapPartitionsWithIndex` and manually write to HDFS, then that particular partition will not necessarily output to the desired location of that partition. Therefore the additional shuffle is unnecessary and can be avoided. – samthebest Aug 12 '14 at 10:42
6

This includes the codec as requested, necessary imports, and pimp as requested.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Would give:

prefix/key=1/part-00000
prefix/key=2/part-00000

where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

And

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Would give:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

It should be clear how to edit for parquet.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • not sure it does not have +100 upvote, and actually had zero upvote. Very helpful, thanks! – Aliostad Dec 20 '17 at 20:49
  • 4
    @Aliostad, look at the dates, this was posted one-and-a-half year later. Also, it is _not_ customary (and sometimes considered rude) at SO to post an answer to your own question (after it already has one or more valid ones) and accept it. Sometimes a situation warrants multiple answers, but then you typically keep the original answer accepted (unless it turns out to be wrong, or a new answer from another user is just so much better, but that's not the case here, the OP clearly considered the original answer correct). I can only assume the OP wasn't aware of the guidelines in this case. – Abel Dec 15 '18 at 20:55
  • 2
    @Abel I'm aware of the guidelines, but I felt it necessary to post my own answer as mine "is just so much better" than all here because it's the only answer that: 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. It's sad that it's nearly 2019 and not everyone can even write code that compiles nor is even correct style. – samthebest Dec 17 '18 at 08:30
  • 5
    The top answer is actually the best, it appears you basically copied his. – JP Silvashy Nov 17 '19 at 17:33
  • @JPSilvashy I did try to edit the answer so that it 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. The poster rejected my edits, so I created a new answer. At least dozen people have found my answer more helpful than the top answer. – samthebest Jan 23 '20 at 09:02
4

I have a similar need and found an way. But it has one drawback (which is not a problem for my case): you need to re-partition you data with one partition per output file.

To partition in this way it generally requires to know beforehand how many files the job will output and find a function that will map each key to each partition.

First let's create our MultipleTextOutputFormat-based class:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

With this class Spark will get a key from a partition (the first/last, I guess) and name the file with this key, so it's not good to mix multiple keys on the same partition.

For your example, you will require a custom partitioner. This will do the job:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Now let's put everything together:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

This will generate 3 files under prefix (named 1, 2 and 7), processing everything in one pass.

As you can see, you need some knowledge about your keys to be able to use this solution.

For me it was easier because I needed one output file for each key hash and the number of files was under my control, so I could use the stock HashPartitioner to do the trick.

douglaz
  • 1,306
  • 2
  • 13
  • 17
  • 2
    This is certainly the nicest solution so far and seems to nearly do the trick. I'm a bit concerned that this will result in one file per key, which will cause problems for large data sets. If you could modify your answer so that it the number of output files per key is configurable I'd be very grateful. – samthebest Jun 20 '14 at 09:09
  • @samthebest, I can do that but it will be a very specific solution. Could you update the question to say you want multiple output files per key? By the way, are you really using integer keys on your job? – douglaz Jun 20 '14 at 21:39
  • Well, any key that makes sense to partition on - so something that is reasonable when we call `toString` on it. I'm not sure I need to update my answer as it's well known bad practice to produce large files on HDFS because it limits the types of compression you can use. If we have very large files and we have to pick a splittable compression algo, which might not be best for the job at hand. Furthermore Spark cannot currently read bzip2 (my fav splittable compression) due to a bug in Hadoop. Nevertheless I'll update my answer to be explicit. Again, many thanks. – samthebest Jun 21 '14 at 11:13
  • 2
    This solution puts all the data through one node, if they all have the same key, correct? Seems like a detriment to its general scalability. – Daniel Darabos Jun 21 '14 at 13:14
  • @DanielDarabos point is correct. Surely it's possible to tweak the `IdentityIntPartitioner` so that for each possible key there are several partitions, say M, where one is chosen at random. We'd need to use a hash function and modulo the result by `numPartitions`, though there is then a problem - different keys could end up in the same partition, which I'm assuming will break the `saveAsHadoopFile` ? It's a non-trivial problem. – samthebest Jun 21 '14 at 16:40
  • what is the generateActualKey mean? why it always return null? – user2848932 May 09 '15 at 07:27
  • @user2848932 this a hadoop interface. For hadoop, generateActualKey may generate the key based on the data. For Spark and for purpose of this problem, I guess that generateActualKey won't be called so we can return null there just to satisfy the interface. – douglaz May 09 '15 at 18:03
  • @douglaz by the way, when I runs your code, it says: java.lang.RuntimeException: java.lang.NoSuchMethodException: $iwC$$iwC$KeyBasedOutput.() Maybe, you should add a initial function for the KeyBasedOutput class? – user2848932 May 11 '15 at 02:48
4

I was in need of the same thing in Java. Posting my translation of Zhang Zhan's Scala answer to Spark Java API users:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}
Community
  • 1
  • 1
Thamme Gowda
  • 11,249
  • 5
  • 50
  • 57
3

saveAsText() and saveAsHadoop(...) are implemented based on the RDD data, specifically by the method: PairRDD.saveAsHadoopDataset which takes the data from the PairRdd where it's executed. I see two possible options: If your data is relatively small in size, you could save some implementation time by grouping over the RDD, creating a new RDD from each collection and using that RDD to write the data. Something like this:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

Note that it will not work for large datasets b/c the materialization of the iterator at v.toSeq might not fit in memory.

The other option I see, and actually the one I'd recommend in this case is: roll your own, by directly calling the hadoop/hdfs api.

Here's a discussion I started while researching this question: How to create RDDs from another RDD?

maasg
  • 37,100
  • 11
  • 88
  • 115
  • 3
    Yes, I'd like to use the hadoop/hdfs api - i.e. use `MultipleOutputFormat`, but I would like to know *how* to do that. – samthebest Jun 04 '14 at 10:30
  • You can't make an RDD inside another RDD (your 2nd line). See this ppt http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs?related=1 – Adrian Mar 25 '15 at 14:07
  • @Adrian you're right. I was missing a collect there. – maasg Mar 25 '15 at 14:18
3

I had a similar use case where I split the input file on Hadoop HDFS into multiple files based on a key (1 file per key). Here is my scala code for spark

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

I have grouped the records based on key. The values for each key is written to separate file.

Lightness Races in Orbit
  • 378,754
  • 76
  • 643
  • 1,055
shanmuga
  • 4,329
  • 2
  • 21
  • 35
  • this looks like a great solution, specially because it deals with the result iterables, I'm getting a org.apache.spark.SparkException: Task not serializable, do you think the fs instance is causing this problem? – perrohunter Dec 17 '15 at 19:34
  • I like this solution, as it doesn't use DataFrames. It works for me. I worry that it only writes 1 file per group which could be troublesome for large datasets right? For example my groups are about 150MB which is fine... – eggie5 Aug 21 '19 at 17:02
  • I think this solution doesn't work for huge amount of data in each key – Ayoub Omari Oct 27 '20 at 15:29
1

good news for python user in the case you have multi columns and you want to save all the other columns not partitioned in csv format which will failed if you use "text" method as Nick Chammas' suggestion .

people_df.write.partitionBy("number").text("people") 

error message is "AnalysisException: u'Text data source supports only a single column, and you have 2 columns.;'"

In spark 2.0.0 (my test enviroment is hdp's spark 2.0.0) package "com.databricks.spark.csv" is now integrated , and it allow us save text file partitioned by only one column, see the example blow:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

In my spark 1.6.1 enviroment ,the code didn't throw any error,however ther is only one file generated. it's not partitioned by two folders.

Hope this can help .

dalin qin
  • 126
  • 2
  • 10
1

I had a similar use case. I resolved it in Java by writing two custom classes implemeting MultipleTextOutputFormat and RecordWriter.

My input was a JavaPairRDD<String, List<String>> and I wanted to store it in a file named by its key, with all the lines contained in its value.

Here is the code for my MultipleTextOutputFormat implementation

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

Here is the code for my RecordWriter implementation.

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

Most of the code is exactly the same than in FileOutputFormat. The only difference is those few lines

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

These lines allowed me to write each line of my input List<String> on the file. The first argument of the write function is set to null in order to avoid writting the key on each line.

To finish, I only need to do this call to write my files

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);
jeanr
  • 1,031
  • 8
  • 15