I ran into a little problem with my Spark Scala script. Basically I have raw data which I am doing aggregations on and after grouping and counting etc I want to save the output to a specific JSON format.
EDIT:
I tried to simplify the question and rewrote it:
When I select data from the source dataframe with an Array[org.apache.spark.sql.Column]
where the column names have aliases, then using column names (or indeed indices) as variables when trying to map the rows to a case class, then I get a "Task not serializable" exception.
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
And the second part or the question, I actually need the final schema to be an array of these Result
class objects. I still haven't figured out, how to do this as well. The expected result should have a schema like that:
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL;DR:
How to map dataframe rows to a case class object when dataframe columns have aliases and variables are used for column names?
How to add these case class objects to an array?