1

I have a dataframe like this:

|-----+-----+-------+---------|
| foo | bar | fox   | cow     |
|-----+-----+-------+---------|
|   1 |   2 | red   | blue    | // row 0
|   1 |   2 | red   | yellow  | // row 1
|   2 |   2 | brown | green   | // row 2
|   3 |   4 | taupe | fuschia | // row 3
|   3 |   4 | red   | orange  | // row 4
|-----+-----+-------+---------|

I need to group the records by "foo" and "bar" and then perform some magical computation on "fox" and "cow" to produce "badger", which may insert or delete records:

|-----+-----+-------+---------+---------|
| foo | bar | fox   | cow     | badger  |
|-----+-----+-------+---------+---------|
|   1 |   2 | red   | blue    | zebra   |
|   1 |   2 | red   | blue    | chicken |
|   1 |   2 | red   | yellow  | cougar  |
|   2 |   2 | brown | green   | duck    |
|   3 |   4 | red   | orange  | peacock |
|-----+-----+-------+---------+---------|

(In this example, row 0 has been split into two "badger" values, and row 3 has been deleted from the final output.)

My best approach so far looks like this:

val groups = df.select("foo", "bar").distinct
groups.flatMap(row => {
  val (foo, bar): (String, String) = (row(0), row(1))
  val group: DataFrame = df.where(s"foo == '$foo' AND bar == '$bar'")
  val rowsWithBadgers: List[Row] = makeBadgersFor(group)
  rowsWithBadgers
})

This approach has a few problems:

  1. It's clumsy to match on foo and bar individually. (A utility method can clean that up, so not a big deal.)
  2. It throws an Invalid tree: null\nnull error because of the nested operation in which I try to refer to df from inside groups.flatMap. Don't know how to get around that one yet.
  3. I'm not sure whether this mapping and filtering actually leverages Spark distributed computation correctly.

Is there a more performant and/or elegant approach to this problem?

This question is very similar to Spark DataFrame: operate on groups, but I'm including it here because 1) it's not clear if that question requires addition and deletion of records, and 2) the answers in that question are out-of-date and lacking detail.

I don't see a way to accomplish this with groupBy and a user-defined aggregate function, because an aggregation function aggregates to a single row. In other words,

udf(<records with foo == 'foo' && bar == 'bar'>) => [foo,bar,aggregatedValue]

I need to possibly return two or more different rows, or zero rows after analyzing my group. I don't see a way for aggregation functions to do this -- if you have an example, please share.

Sasgorilla
  • 2,403
  • 2
  • 29
  • 56

1 Answers1

0

A user-defined function could be used. The single row returned can contain a list. Then you can explode the list into multiple rows and reconstruct the columns.

The aggregator:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders.kryo
import org.apache.spark.sql.expressions.Aggregator

case class StuffIn(foo: BigInt, bar: BigInt, fox: String, cow: String)
case class StuffOut(foo: BigInt, bar: BigInt, fox: String, cow: String, badger: String)
object StuffOut {
  def apply(stuffIn: StuffIn): StuffOut = new StuffOut(stuffIn.foo, 
stuffIn.bar, stuffIn.fox, stuffIn.cow, "dummy")
}

object MultiLineAggregator extends Aggregator[StuffIn, Seq[StuffOut], Seq[StuffOut]] {
  def zero: Seq[StuffOut] = Seq[StuffOut]()
  def reduce(buffer: Seq[StuffOut], stuff: StuffIn): Seq[StuffOut] = {
    makeBadgersForDummy(buffer, stuff)
  }

  def merge(b1: Seq[StuffOut], b2: Seq[StuffOut]): Seq[StuffOut] = {
    b1 ++: b2
  }
  def finish(reduction: Seq[StuffOut]): Seq[StuffOut] = reduction
  def bufferEncoder: Encoder[Seq[StuffOut]] = kryo[Seq[StuffOut]]
  def outputEncoder: Encoder[Seq[StuffOut]] = kryo[Seq[StuffOut]]
}

The call:

val averageSalary: TypedColumn[StuffIn, Seq[StuffOut]] = MultiLineAggregator.toColumn

val res: DataFrame =
  ds.groupByKey(x => (x.foo, x.bar))
          .agg(averageSalary)
          .map(_._2)
          .withColumn("value", explode($"value"))
          .withColumn("foo", $"value.foo")
          .withColumn("bar", $"value.bar")
          .withColumn("fox", $"value.fox")
          .withColumn("cow", $"value.cow")
          .withColumn("badger", $"value.badger")
          .drop("value")
étienne
  • 240
  • 2
  • 7