I'm working on some libraries for some design patterns I am running into over and over while programming with Spark. One I'm trying to generalize is where you group a Dataset
by some key, then for each group, do some collation, then return the original type, so a simple example would be:
case class Counter(id: String, count: Long)
// Let's say I have some Dataset...
val counters: Dataset[Counter]
// The operation I find myself doing quite often:
import sqlContext.implicits._
counters.groupByKey(_.id)
.reduceGroups((a, b) => Counter(a.id, a.count + b.count))
.map(_._2)
So to generalize this, I add a new type:
trait KeyedData[K <: Product, T <: KeyedData[K, T] with Product] { self T =>
def key: K
def merge(other: T): T
}
Then I change the type definition of Counter
to this:
case class Counter(id: String, count: Long) extends KeyedData[String, Counter] {
override def key: String = id
override def merge(other: Counter): Counter = Counter(id, count + other.count)
}
Then I made the following implicit class to add functionality to a Dataset:
implicit class KeyedDataDatasetWrapper[K <: Product, T <: KeyedData[K, T] with Product](ds: Dataset[T]) {
def collapse(implicit sqlContext: SQLContext): Dataset[T] = {
import sqlContext.implicits._
ds.groupByKey(_.key).reduceGroups(_.merge(_)).map(_._2)
}
}
Everytime time I compile though, I get this:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] ds.groupByKey(_.key).reduceGroups(_.merge(_)).map(_._2)
[error] ^
[error] one error found
Clearly something is not getting recognized as a Product
type, so something must be wrong with my type parameters somewhere, but I'm not sure what. Any ideas?
UPDATE
I changed my implicit class to the following:
implicit class KeyedDataDatasetWrapper[K <: Product : TypeTag,
T <: KeyedData[K, T] with Product : TypeTag](ds: Dataset[T]) {
def merge(implicit sqlContext: SQLContext): Dataset[T] = {
implicit val encK: Encoder[K] = Encoders.product[K]
implicit val encT: Encoder[T] = Encoders.product[T]
ds.groupByKey(_.key).reduceGroups(_.comb(_)).map(_._2)
}
}
However now when I try and compile this:
val ds: Dataset[Counter] = ...
val merged = ds.merge
I get this compile error now, it seems Dataset[Counter]
is not matching Dataset[T]
in the implicit class definition:
: value merge is not a member of org.apache.spark.sql.Dataset[Counter]
[error] ds.merge
[error] ^