2

For the following code - in which a DataFrame is converted to RDD[Row] and data for a new column is appended via mapPartitions:

 // df is a DataFrame
val dfRdd = df.rdd.mapPartitions {
  val bfMap = df.rdd.sparkContext.broadcast(factorsMap)
  iter =>
    val locMap = bfMap.value
    iter.map { r =>
      val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
      Row(newseq)
    }
}

The output is correct for the RDD[Row] with another column:

println("**dfrdd\n" + dfRdd.take(5).mkString("\n"))

**dfrdd
[ArrayBuffer(0021BEC286CC, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 148818)]
[ArrayBuffer(0021BEE7C556, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 26908)]
[ArrayBuffer(8C7F3BFD4B82, 4, Series, series, bc514da3e0d534da8207e3aab231d1cb, livetv, 99942)]
[ArrayBuffer(0021BEC8F8B8, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 53994)]
[ArrayBuffer(10EA59F10C8B, 1, Series, series, 0d2debc63efa3790a444c7959249712b, livetv, 1427)]

Let us try to convert the RDD[Row] back to a DataFrame:

val newSchema = df.schema.add(StructField("userf",IntegerType))

Now let us create the updated DataFrame:

val df2 = df.sqlContext.createDataFrame(dfRdd,newSchema)

Is the new schema looking correct?

newSchema.show()

root
 |-- user: string (nullable = true)
 |-- score: long (nullable = true)
 |-- programType: string (nullable = true)
 |-- source: string (nullable = true)
 |-- item: string (nullable = true)
 |-- playType: string (nullable = true)
 |-- userf: integer (nullable = true)

Notice we do see the new userf column..

However it does not work:

println("df2: " + df2.take(1))

Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, 
most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost, executor driver): java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: scala.collection.mutable.ArrayBuffer is not a  
 valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true) AS user#28
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, user), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null

So: what detail is missing here?

Note: I am not interested in different approaches: e.g. withColumn or Datasets.. Let us please consider only the approach:

  • convert to RDD
  • add new data element to each row
  • update the schema for the new column
  • convert the new RDD+schema back to DataFrame
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

1 Answers1

4

There seems to be a small mistake calling Row's constructor:

val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
Row(newseq)

The signature of this "constructor" (apply method, actually) is:

def apply(values: Any*): Row

When you pass a Seq[Any], it is treated as a single value of type Seq[Any]. You want to pass the elements of this Sequence, therefore you should use:

val newseq = r.toSeq :+ locMap(r.getAs[String](inColName))
Row(newseq: _*)

Once this is fixed, the Rows will match the schema you built, and you'll get the expected result.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • 4
    ur correct! now you can pull ahead of me in the rep ! – WestCoastProjects Jan 26 '17 at 08:51
  • 1
    Thank you so much! I am quite new to scala and spark and this took me quite long to find out. [Here](https://stackoverflow.com/questions/7938585/what-does-param-mean-in-scala) is also some more information about the `: _*` notation – Alex Nov 03 '17 at 12:49
  • 1
    I just ran into this Q&A - saw that I had upvoted it .. and then .. that *I* was the asker ! Sorry I can't upvote yet again – WestCoastProjects Jun 01 '18 at 19:50