0

I am having trouble with converting my dataframe to a dataset in Spark (2.2.1) Scala (2.11.8)

Basically, I am attempting an aggregation that collects the left dataset into a list. I am doing this step all over the place using case classes and tuples. I don't want to rewrite the same routine over and over again so i decided to refactor the step into this method:

 def collectUsingGenerics[L <: Product : Encoder,R <: Product : Encoder](
                              left: Dataset[L],
                              right: Dataset[R],
                              joinCol: Column,
                              groupCol: Column): Dataset[(L,List[R])] = {

import left.sparkSession.implicits._
import org.apache.spark.sql.functions._

val result = left
  .join(right, joinCol)
  .select(
    groupCol.as("groupCol"),
    struct(left("*")).as("_1"),
    struct(right("*")).as("_2"))
  .groupBy($"groupCol")
  .agg(
    first($"_1").as("_1"),
    collect_list($"_2").as("_2")
  )
  .drop($"groupCol")

//This does not Work!!!
  result.as[(L,List[R])]
}

The Unit Test:

"collectUsingGenerics" should "collect the right-side Dataset" in {
   val left = spark.createDataset(Seq(
     (1, "Left 1"),
     (2, "Left 2")
   ))

   val right = spark.createDataset(Seq(
     (101, 1, "Right 1"),
     (102, 1, "Right 2"),
     (103, 2, "Right 3")
   ))

  val collectedDataset = Transformations.collectUsingGenerics[(Int, String), (Int, Int, String)](left, right, left("_1") === right("_2"), left("_1"))
      .collect()
      .sortBy(_._1._1)

  val data1 = collectedDataset(0)
  data1._1 should be (1, "Left 1")
  data1._2 should contain only((101, 1, "Right 1"), (102, 1, "Right 2"))
}

The problem is, I can't compile this due to missing encoders:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]     result.as[(L,List[R])]
[error]              ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed

I was under the impression that importing spark.implicits._ is enough to generate encoders for tuple and case classes, as well as primitive types. Did I miss something?

CodeFactory.DEV
  • 470
  • 8
  • 20

1 Answers1

3

You also need an implicit TypeTag for those types. See here for original question: scala generic encoder for spark case class

def collectUsingGenerics[L <: Product : Encoder : TypeTag, R <: Product : Encoder : TypeTag](
    left: Dataset[L],
    right: Dataset[R],
    joinCol: Column,
    groupCol: Column): Dataset[(L, List[R])] = {

    import left.sparkSession.implicits._
    import org.apache.spark.sql.functions._

    val result = left
      .join(right, joinCol)
      .select(
        groupCol.as("groupCol"),
        struct(left("*")).as("_1"),
        struct(right("*")).as("_2"))
      .groupBy($"groupCol")
      .agg(
        first($"_1").as("_1"),
        collect_list($"_2").as("_2")
      )
      .drop($"groupCol")

    //This does not Work!!!
    //result.as[Result[L]]
    result.as[(L,List[R])]
  }
Steve Robinson
  • 452
  • 5
  • 9