4

This question is about the duality between DataFrame and RDD when it comes to aggregation operations. In Spark SQL one can use table generating UDFs for custom aggregations but creating one of those is typically noticeably less user-friendly than using the aggregation functions available for RDDs, especially if table output is not required.

Is there an efficient way to apply pair RDD operations such as aggregateByKey to a DataFrame which has been grouped using GROUP BY or ordered using ORDERED BY?

Normally, one would need an explicit map step to create key-value tuples, e.g., dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...). Can this be avoided?

Sim
  • 13,147
  • 9
  • 66
  • 95

1 Answers1

1

Not really. While DataFrames can be converted to RDDs and vice versa this is relatively complex operation and methods like DataFrame.groupBy don't have the same semantics as their counterparts on RDD.

The closest thing you can get is a new DataSet API introduced in Spark 1.6.0. It provides a much closer integration with DataFrames and GroupedDataset class with its own set of methods including reduce, cogroup or mapGroups:

case class Record(id: Long, key: String, value: Double)

val df = sc.parallelize(Seq(
    (1L, "foo", 3.0), (2L, "bar", 5.6),
    (3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")

val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show

// +-----+-----------+
// |   _1|         _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+

In some specific cases it is possible to leverage Orderable semantics to group and process data using structs or arrays. You'll find an example in SPARK DataFrame: select the first row of each group

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Yes, Datasets do look interesting but the support in Spark 1.6.0 is still quite buggy: they are an experimental feature. – Sim Feb 22 '16 at 01:12
  • 1
    It is :) Paradoxically Spark DataFrames play much nicer with PySpark than Scala. Unfortunately hops between JVM and Python make things quite expensive. – zero323 Feb 22 '16 at 18:04
  • @zero323 , have been testing your example, but getting error `error: value reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset` . Am I missing an import? (only reduce I've managed to find are related to RDDs) – Dan Nov 16 '18 at 20:09