I am working on a project in apache Spark and the requirement is to write the processed output from spark into a specific format like Header -> Data -> Trailer
. For writing to HDFS I am using the .saveAsHadoopFile
method and writing the data to multiple files using the key as a file name. But the issue is the sequence of the data is not maintained files are written in Data->Header->Trailer
or a different combination of three. Is there anything I am missing with RDD transformation?

- 861
- 5
- 19
1 Answers
Ok so after reading from StackOverflow questions, blogs and mail archives from google. I found out how exactly .union()
and other transformation works and how partitioning is managed. When we use .union()
the partition information is lost by the resulting RDD and also the ordering and that's why My output sequence was not getting maintained.
What I did to overcome the issue is numbering the Records like
Header = 1, Body = 2, and Footer = 3
so using sortBy
on RDD which is union of all three I sorted it using this order number with 1 partition. And after that to write to multiple file using key as filename I used HashPartitioner so that same key data should go into separate file.
val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD
val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))
val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))
output.saveAsHadoopFile ... // and using MultipleTextOutputFormat save to multiple file using key as filename
This might not be the final or most economical solution but it worked. I am also trying to find other ways to maintain the sequence of output as Header->Body->Footer
. I also tried .coalesce(1)
on all three RDD's and then do the union but that was just adding three more transformation to RDD's and .sortBy
function also take partition information which I thought will be same, but coalesceing the RDDs first also worked. If Anyone has some another approach please let me know, or add more to this will be really helpful as I am new to Spark
References:
Write to multiple outputs by key Spark - one Spark job
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-2-RDD-s-only-returns-the-first-one-td766.html -- this one helped a lot