I have a dataframe like this:
|-----+-----+-------+---------|
| foo | bar | fox | cow |
|-----+-----+-------+---------|
| 1 | 2 | red | blue | // row 0
| 1 | 2 | red | yellow | // row 1
| 2 | 2 | brown | green | // row 2
| 3 | 4 | taupe | fuschia | // row 3
| 3 | 4 | red | orange | // row 4
|-----+-----+-------+---------|
I need to group the records by "foo" and "bar" and then perform some magical computation on "fox" and "cow" to produce "badger", which may insert or delete records:
|-----+-----+-------+---------+---------|
| foo | bar | fox | cow | badger |
|-----+-----+-------+---------+---------|
| 1 | 2 | red | blue | zebra |
| 1 | 2 | red | blue | chicken |
| 1 | 2 | red | yellow | cougar |
| 2 | 2 | brown | green | duck |
| 3 | 4 | red | orange | peacock |
|-----+-----+-------+---------+---------|
(In this example, row 0 has been split into two "badger" values, and row 3 has been deleted from the final output.)
My best approach so far looks like this:
val groups = df.select("foo", "bar").distinct
groups.flatMap(row => {
val (foo, bar): (String, String) = (row(0), row(1))
val group: DataFrame = df.where(s"foo == '$foo' AND bar == '$bar'")
val rowsWithBadgers: List[Row] = makeBadgersFor(group)
rowsWithBadgers
})
This approach has a few problems:
- It's clumsy to match on
foo
andbar
individually. (A utility method can clean that up, so not a big deal.) - It throws an
Invalid tree: null\nnull
error because of the nested operation in which I try to refer todf
from insidegroups.flatMap
. Don't know how to get around that one yet. - I'm not sure whether this mapping and filtering actually leverages Spark distributed computation correctly.
Is there a more performant and/or elegant approach to this problem?
This question is very similar to Spark DataFrame: operate on groups, but I'm including it here because 1) it's not clear if that question requires addition and deletion of records, and 2) the answers in that question are out-of-date and lacking detail.
I don't see a way to accomplish this with groupBy
and a user-defined aggregate function, because an aggregation function aggregates to a single row. In other words,
udf(<records with foo == 'foo' && bar == 'bar'>) => [foo,bar,aggregatedValue]
I need to possibly return two or more different rows, or zero rows after analyzing my group. I don't see a way for aggregation functions to do this -- if you have an example, please share.