I am having trouble with converting my dataframe to a dataset in Spark (2.2.1) Scala (2.11.8)
Basically, I am attempting an aggregation that collects the left dataset into a list. I am doing this step all over the place using case classes and tuples. I don't want to rewrite the same routine over and over again so i decided to refactor the step into this method:
def collectUsingGenerics[L <: Product : Encoder,R <: Product : Encoder](
left: Dataset[L],
right: Dataset[R],
joinCol: Column,
groupCol: Column): Dataset[(L,List[R])] = {
import left.sparkSession.implicits._
import org.apache.spark.sql.functions._
val result = left
.join(right, joinCol)
.select(
groupCol.as("groupCol"),
struct(left("*")).as("_1"),
struct(right("*")).as("_2"))
.groupBy($"groupCol")
.agg(
first($"_1").as("_1"),
collect_list($"_2").as("_2")
)
.drop($"groupCol")
//This does not Work!!!
result.as[(L,List[R])]
}
The Unit Test:
"collectUsingGenerics" should "collect the right-side Dataset" in {
val left = spark.createDataset(Seq(
(1, "Left 1"),
(2, "Left 2")
))
val right = spark.createDataset(Seq(
(101, 1, "Right 1"),
(102, 1, "Right 2"),
(103, 2, "Right 3")
))
val collectedDataset = Transformations.collectUsingGenerics[(Int, String), (Int, Int, String)](left, right, left("_1") === right("_2"), left("_1"))
.collect()
.sortBy(_._1._1)
val data1 = collectedDataset(0)
data1._1 should be (1, "Left 1")
data1._2 should contain only((101, 1, "Right 1"), (102, 1, "Right 2"))
}
The problem is, I can't compile this due to missing encoders:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] result.as[(L,List[R])]
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
I was under the impression that importing spark.implicits._ is enough to generate encoders for tuple and case classes, as well as primitive types. Did I miss something?