3

I'm working on AWS EMR with Spark version 2.4.7-amzn-1, using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302).

I wanted to reduce a dataset of my custom case class Item by key, where the key itself is a custom case class. However, the reduceByKey did not work as I expected.

Here are the two classes:

case class Key(
  name: String,
  color: String
)

case class Item(
  name: String,
  color: String,
  count: Int
) {
    def key: Key = Key(name, color)
}

To aggregate, I defined a custom combine function in Item's companion object that just adds up the counts:

object Item {
  def combine(i1: Item, i2: Item): Item = i1.copy(count = i1.count + i2.count)
}

Here's my aggregate function:

import org.apache.spark.sql.Dataset
import spark.implicits._

def aggregate(items: Dataset[Item]): Dataset[Item] = items
  .rdd
  .keyBy(_.key)
  .reduceByKey(Item.combine)
  .map(_._2)
  .toDS

Now if I try to aggregate...

val items: Dataset[Item] = spark.sparkContext.parallelize(
  Seq(
    Item("Square", "green", 8),
    Item("Triangle", "blue", 3),
    Item("Square", "green", 5),
    Item("Triangle", "blue", 7)
  )
).toDS

val aggregated: Dataset[Item] = aggregate(items)

aggregated.show

...the output shows that the dataset has not been reduced:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|    8|
|  Square|green|    5|
|Triangle| blue|    3|
|Triangle| blue|    7|
+--------+-----+-----+

However, I observed that the aggregation did work, when I changed the order of the 4 items in the sequence, so the outcome is not consistent.

If I change the key from being a case class instance

def key: Key = Key(name, color)

into being a tuple

def key: Tuple2[String, String] = (name, color)

the aggregation works as expected, giving this output:

+--------+-----+-----+
|    name|color|count|
+--------+-----+-----+
|  Square|green|   13|
|Triangle| blue|   10|
+--------+-----+-----+

So, does reduceByKey in general not (reliably) work with case classes? Is this the expected behavior? Or has this nothing to do with case class vs. tuple and the real cause lies hidden somewhere else? My Key class seems quite simple to me, so I guess, it's not a hashing or comparing issue. (I could be wrong.)

I also looked at this question reduceByKey using Scala object as key, but there the cause turned out to be a typo, and chrisbtk explicitly stated: "Spark knows how to compare two object even if they do not implement Ordered."

Do I always have to use tuples as keys?

Oli
  • 9,766
  • 5
  • 25
  • 46
  • 1
    I have to add that I tried this in Zeppelin and in the Spark shell. Seems to be a problem of these interactive environments: [Why case class used as key in reduceByKey doesn't work in spark?](https://stackoverflow.com/questions/39805652/why-case-class-used-as-key-in-reducebykey-doesnt-work-in-spark), [Spark Issue](https://issues.apache.org/jira/browse/SPARK-2620) – TheGreenHeptagon Nov 26 '21 at 14:48
  • That post is about a much older version – gatear Nov 26 '21 at 16:49

1 Answers1

1

Try using the Dataset API directly:

Having:

import sparkSession.implicits._
import org.apache.spark.sql.Encoders

implicit val key: Encoder[Key] = Encoders.product[Key]

You can do:

items
  .groupByKey(_.key)
  .reduceGroups(Item.combine)
  .map(_._2)
gatear
  • 946
  • 2
  • 10
  • 1
    Does this scale well, i.e. does it reduce each partition separately before shuffling, or does it shuffle all data first and then reduce? (I once learned to avoid groupByKey in favor of reduceByKey, for groupByKey introduced unnecessary shuffling, but maybe that has changed in newer versions.) – TheGreenHeptagon Nov 27 '21 at 12:56