0

I want to add a new column with row id to a DataFrame in Spark(Scala). This is the approach i took. I am creating a new row with index id and a new StructType with another StructField included.

 val rdd = df.rdd.zipWithIndex().map(indexedRow => Row.fromSeq(indexedRow._2.toString ++ indexedRow._1.toSeq ))
 val list = StructType(Seq(StructField("Row Number", StringType, true)).++(df.schema.fields))
 sqlContext.createDataFrame(rdd, list).show() // fails

I am getting the following exception when i run.

scala.MatchError: 0 (of class java.lang.Character)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)

But the structtype and rdd have the expected output. Can anyone help me with this? Please

I have tried with Spark2.10 1.6.0 and 1.6.1 versions

NehaM
  • 1,272
  • 1
  • 18
  • 32

2 Answers2

2

You have just one small mistake, and that's in prepending the string value to the sequence of fields - instead of:

indexedRow._2.toString ++ indexedRow._1.toSeq

You should use:

indexedRow._2.toString +: indexedRow._1.toSeq

The first implementation actually converts the string into a Seq[Char] and then concatenates the two sequences, so you end up with something like Seq('1', '2', "f1Val", "f2Val") instead of Seq("12", "f1Val", "f2Val"). The exception you see is Spark trying to parse the first Char as a StringType and failing.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
1

Take a look at this answer for a better way of assigning unique IDs for rdd rows(RDD.zipWithUniqueId)

Community
  • 1
  • 1