groupByKey
shuffles the data and the order of grouped value is no longer guaranteed. It is also rather expensive.
If you really want to operate on RDDs
not DataFrames
and ordering is based on the date you can use aggregateByKey
:
import scala.math.Ordering
type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)
val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))
def minMax(x: (Record, Record), y: (Record, Record)) = {
(RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}
rdd.aggregateByKey((maxRecord, minRecord))(
(acc, x) => minMax(acc, (x, x)),
minMax
)
With DataFrames
you can try something like this:
import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window
val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")
val w = Window
.partitionBy(partition.head, partition.tail: _*)
.orderBy(order.head, order.tail: _*)
// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)
// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))
// Add columns with first / last vals
val expanded = df.select(
partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)
// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)
There are some other ways you can solve this problem with DataFrames
including ordering over structs
and DataSet
API. See my answer to SPARK DataFrame: select the first row of each group