13

I have some spark code to process a csv file. It does some transformation on it. I now want to save this RDD as a csv file and add a header. Each line of this RDD is already formatted correctly.

I am not sure how to do it. I wanted to do a union with the header string and my RDD but the header string is not an RDD so it does not work.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
poiuytrez
  • 21,330
  • 35
  • 113
  • 172

5 Answers5

10

You can make an RDD out of your header line and then union it, yes:

val rdd: RDD[String] = ...
val header: RDD[String] = sc.parallelize(Array("my,header,row"))
header.union(rdd).saveAsTextFile(...)

Then you end up with a bunch of part-xxxxx files that you merge.

The problem is that I don't think you're guaranteed that the header will be the first partition and therefore end up in part-00000 and at the top of your file. In practice, I'm pretty sure it will.

More reliable would be to use Hadoop commands like hdfs to merge the part-xxxxx files, and as part of the command, just throw in the header line from a file.

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
  • 3
    In Spark 1.6.2 running in distributed mode, union did not put header on top for me. Here is my code snippet :- `val header = sc.parallelize(Array('col1','col2'), 1) header.union( rdd.map(_.toString)) .repartition(1).saveAsTextFile(outputLocation)` – haltTm Oct 25 '16 at 21:52
  • same issue for me... `union()` is not guaranteed to preserve order. looking for a workaround now, looks like sorting the RDD might help – Tajh Taylor Sep 04 '19 at 14:26
0

Some help on writing it without Union(Supplied the header at the time of merge)

val fileHeader ="This is header"
val fileHeaderStream: InputStream = new  ByteArrayInputStream(fileHeader.getBytes(StandardCharsets.UTF_8));
val output = IOUtils.copyBytes(fileHeaderStream,out,conf,false)

Now loop over you file parts to write the complete file using

val in: DataInputStream = ...<data input stream from file >
 IOUtils.copyBytes(in, output, conf, false)

This made sure for me that header always comes as first line even when you use coalasec/repartition for efficient writing

SriA
  • 1
  • 2
0
def addHeaderToRdd(sparkCtx: SparkContext, lines: RDD[String], header: String): RDD[String] = {

    val headerRDD = sparkCtx.parallelize(List((-1L, header)))     // We index the header with -1, so that the sort will put it on top.

    val pairRDD = lines.zipWithIndex()

    val pairRDD2 = pairRDD.map(t => (t._2, t._1))

    val allRDD = pairRDD2.union(headerRDD)

    val allSortedRDD = allRDD.sortByKey()

    return allSortedRDD.values
}
0

Slightly diff approach with Spark SQL

From Question: I now want to save this RDD as a CSV file and add a header. Each line of this RDD is already formatted correctly.

With Spark 2.x you have several options to convert RDD to DataFrame

val rdd = .... //Assume rdd properly formatted with case class or tuple
val df = spark.createDataFrame(rdd).toDF("col1", "col2", ... "coln")

df.write
  .format("csv")
  .option("header", "true")  //adds header to file
  .save("hdfs://location/to/save/csv")

Now we can even use Spark SQL DataFrame to load, transform and save CSV file

Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
0
spark.sparkContext.parallelize(Seq(SqlHelper.getARow(temRet.columns, 
temRet.columns.length))).union(temRet.rdd).map(x => 
x.mkString("\x01")).coalesce(1, true).saveAsTextFile(retPath)


object SqlHelper {
//create one row
def getARow(x: Array[String], size: Int): Row = {
var columnArray = new Array[String](size)
for (i <- 0 to (size - 1)) {
  columnArray(i) = x(i).toString()
}
Row.fromSeq(columnArray)
}
}
berylqliu
  • 39
  • 3