0

So I have this code

val expanededDf = io.readInputs().mapPartitions{ 
(iter:Iterator[Row]) => {
    iter.map{
        (item:Row) => {
            val myNewColumn = getUdf($"someColumnOriginal")
            Row.fromSeq(item.toSeq :+(myNewColumn))
            }
     }
 }
 }

I am getting a exception:"Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases." My imports are:

import spark.implicits._
import org.apache.spark.sql._

I have to use the UDF as the function is very complex making some REST calls. Basically the code tries to add a new column into a Row using a particular column value and then returns a dataframe. I have tried using withColumn but since I am dealing here with Petabytes of data it is extremely slow. I am a newbie to spark and scala and hence I apologise in advance if my question is extremely lame.

1 Answers1

1

First of all, withColumn is the way to go, and if it's slow, it's probably because your job needs tuning, and I think switching to RDDs won't make it any faster.

But anyway...you are not supposed to refer to a DataFrame within the function that is called on every single row of an RDD.

To better understand what's happening, when running a spark program, there's a Driver, which is the master, and there are the Executors, which are the slaves. The slaves don't know about DataFrames, only the driver does.

There is another important point, when you're writing code that runs in the executor, you must be careful when referencing variables that are in the Driver's scope. If you do, Spark will try to serialize them and send them to the Executors. It's ok if it's what you want AND if those objects are small AND if Spark knows how to serialize them.

In this case, Spark is trying to serialize $"someColumnOriginal", which is an object of class Column, but it doesn't know how and it fails. In this case, to make it work, you have to know in what position the field you want is, let's say it's in position 2, you would write

 Row.fromSeq(item.toSeq :+ item.get(2))

You can get the position by looking at the schema if it's available (item.schema, rdd.schema), and since it's an int, it can be done outside the loops and Spark will be able to serialize that. You can read this article http://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error for more about serialization.

Roberto Congiu
  • 5,123
  • 1
  • 27
  • 37