3

I'm working on some libraries for some design patterns I am running into over and over while programming with Spark. One I'm trying to generalize is where you group a Dataset by some key, then for each group, do some collation, then return the original type, so a simple example would be:

case class Counter(id: String, count: Long)

// Let's say I have some Dataset...
val counters: Dataset[Counter]

// The operation I find myself doing quite often:
import sqlContext.implicits._
counters.groupByKey(_.id)
  .reduceGroups((a, b) => Counter(a.id, a.count + b.count))
  .map(_._2)

So to generalize this, I add a new type:

trait KeyedData[K <: Product, T <: KeyedData[K, T] with Product] { self T =>
  def key: K
  def merge(other: T): T
}

Then I change the type definition of Counter to this:

case class Counter(id: String, count: Long) extends KeyedData[String, Counter] {
  override def key: String = id
  override def merge(other: Counter): Counter = Counter(id, count + other.count)
}

Then I made the following implicit class to add functionality to a Dataset:

implicit class KeyedDataDatasetWrapper[K <: Product, T <: KeyedData[K, T] with Product](ds: Dataset[T]) {
  def collapse(implicit sqlContext: SQLContext): Dataset[T] = {
    import sqlContext.implicits._

    ds.groupByKey(_.key).reduceGroups(_.merge(_)).map(_._2)
  }
}

Everytime time I compile though, I get this:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       ds.groupByKey(_.key).reduceGroups(_.merge(_)).map(_._2)
[error]                    ^
[error] one error found

Clearly something is not getting recognized as a Product type, so something must be wrong with my type parameters somewhere, but I'm not sure what. Any ideas?

UPDATE

I changed my implicit class to the following:

implicit class KeyedDataDatasetWrapper[K <: Product : TypeTag,
                                       T <: KeyedData[K, T] with Product : TypeTag](ds: Dataset[T]) {
  def merge(implicit sqlContext: SQLContext): Dataset[T] = {
    implicit val encK: Encoder[K] = Encoders.product[K]
    implicit val encT: Encoder[T] = Encoders.product[T]

    ds.groupByKey(_.key).reduceGroups(_.comb(_)).map(_._2)
  }
}

However now when I try and compile this:

val ds: Dataset[Counter] = ...
val merged = ds.merge

I get this compile error now, it seems Dataset[Counter] is not matching Dataset[T] in the implicit class definition:

: value merge is not a member of org.apache.spark.sql.Dataset[Counter]
[error]     ds.merge
[error]        ^
kapunga
  • 436
  • 3
  • 7
  • 1
    Closely related to [Encode an ADT / sealed trait hierarchy into Spark DataSet column](https://stackoverflow.com/q/41030073/6910411) – zero323 Apr 24 '18 at 17:53

0 Answers0