9

I've got a DataFrame I'm operating on, and I want to group by a set of columns and operate per-group on the rest of the columns. In regular RDD-land I think it would look something like this:

rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
  groupByKey().
  forEachPartition( iter => doSomeJob(iter) )

In DataFrame-land I'd start like this:

df.groupBy("col1", "col2", "col3")  // Reference by name

but then I'm not sure how to operate on the groups if my operations are more complicated than the mean/min/max/count offered by GroupedData.

For example, I want to build a single MongoDB document per ("col1", "col2", "col3") group (by iterating through the associated Rows in the group), scale down to N partitions, then insert the docs into a MongoDB database. The N limit is the max number of simultaneous connections I want.

Any advice?

Ken Williams
  • 22,756
  • 10
  • 85
  • 147
  • 3
    Best Way: Write an UDAF (not yet supported, see SPARK-4233 and SPARK-3947). Till then, use DF.RDD to access RDD methods like aggregateByKey to achieve what you want to build – ayan guha May 21 '15 at 05:20

1 Answers1

1

You can do a self-join. First get the groups:

val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")

Then you can join this back to the original DataFrame:

val joinedDF = groups
  .select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3)
  .join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and  $"col3" <=> $"l_col3")

While this gets you exactly the same data you had originally (and with 3 additional, redundant columns) you could do another join to add a column with the MongoDB document ID for the (col1, col2, col3) group associated with the row.

At any rate, in my experience joins and self-joins are the way you handle complicated stuff in DataFrames.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • I'm not sure how to take it from there - what would let me iterate through all the `$"col4"` & `$"col5"` values associated with a particular combination `($"col1", $"col2", $"col3")`? – Ken Williams May 20 '15 at 21:20
  • The way DataFrames work, you only have two options. Either you do something like toArray on groups, then do a foreach and inside the loops create a DataFrame by filtering on cols1 to cols3. Or you have to do it all in a single DataFrame using complicated joins like I was trying to allude to. – David Griffin May 20 '15 at 21:45