3

I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.

EDIT:

I tried to simplify the question and rewrote it:

When I select data from the source dataframe with an Array[org.apache.spark.sql.Column] where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")

val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF

And the second part or the question, I actually need the final schema to be an array of these Result class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:

    case class Test(var FilteredStatistics: Array[Result])
    val t = Test(Array(Result("Anna"), Result("James")))

    val t2 = sc.parallelize(Seq(t)).toDF

    scala> t2.printSchema
    root
     |-- FilteredStatistics: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- Name: string (nullable = true)

TL;DR:

  1. How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?

  2. How to add these case class objects to an array?

V. Samma
  • 2,558
  • 8
  • 30
  • 34
  • the serialization issue doesn't reproduce - I copied all your code and it works well for me. Looks like somewhere in your code (not pasted here?) you are using a `org.apache.spark.sql.Column` object within a case class used in a DataFrame or in a transformation that is serialized and sent to workers... – Tzach Zohar Dec 20 '16 at 15:12
  • 2
    BTW - one of us is probably lost in the details of this very elaborate question... try to _minimize_ it (a lot) - find the simplest example that reproduces the issue (and ask the other question separately after similar minimalization) – Tzach Zohar Dec 20 '16 at 15:15
  • one fix to try for your Serialization problem... `class Result(???) extends Serializable; object Result { def apply(r: Row): Result = r match { ??? } }` then use pattern matching on r to deal with the variety of formats you might have in your DF. it's often a problem when you try to apply a class to part of a row, but if you create a class that you can map the entire row... `DF.map(Result)` may then work. – kmh Dec 20 '16 at 18:54
  • see [this question](http://stackoverflow.com/questions/22592811) for more help understanding Task Not Serializable issues. – kmh Dec 20 '16 at 19:00
  • @TzachZohar Maybe it's an issue of enviroment? I am using Spark on AWS EMR clusters, currently started a clean emr-4.6.0 with Spark 1.6.1 and copied only the code I shared here and I still got the serialization error. And that was me already trying to minimize it, actual implementation is even more complex. The only place currently where the `Column` is used, is creating a map in step 3. But I can't use a string Seq there, because with strings, groupBy works only like this: `(col1: String,cols: String*)`. But I edited the question with the simplest version of the question. – V. Samma Dec 21 '16 at 09:36
  • Thanks for rewriting the question - much better now! I can reproduce the issue now, await an answer... – Tzach Zohar Dec 21 '16 at 10:12
  • As for your second question - it's better to make it a separate post – Tzach Zohar Dec 21 '16 at 10:18
  • @TzachZohar Thanks! I will get around to trying your solution soon. As for the second part, I thought about making a separate question about it as well. – V. Samma Dec 21 '16 at 10:46
  • @kmh would this solution work with n amount of columns? And I would just need to handle the primitive types like int, double, string, long etc? Or should I create a new dataframe with the specific columns I need? – V. Samma Dec 21 '16 at 10:50

1 Answers1

0

Serialization Issue: the problem here is the val n = "Name": it is used inside an anonymous function passed into an RDD transformation (dm2.map(...)), which makes Spark close over that variable and the scope containing it, which also includes cl2 which has the type Array[Column], hence it isn't serializable.

The solution is simple - either inline n (to get dm2.map(row => Result(row.getAs("Name")))), or place it in a Serializable context (an object or a class that doesn't contain any non-serializable members).

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Okay, I am still having some difficulties. I don't want to do it inline as I declare the column names in one place and then use them throughout the code. I don't want to duplicate them because then I would have to change them in multiple places. But did you mean that I should create an object or a class for holding the necessary variables? I tried `object Params extends Serializable { val name = "Name" }` and `case class Params(name:String)` (case class was supposed to be serializable by default) and using `Params.name` inside the getAs and that didn't help. Sorry if I missed something obvious – V. Samma Dec 22 '16 at 11:33
  • If the object isn't nested in any other class - it should work (works for me...). Note that when you create objects in *spark-shell* they are actually nested in the object that encapsulates the entire shell code, so it might not work there (I think) – Tzach Zohar Dec 22 '16 at 15:39