2

I have this code which gets RDD from Cassandra then extracts first and last row for each key and subtracts them.

val rdd = sc.cassandraTable("keyspace","table")
    .select("column1", "column2", "column3", "column4","column5")
    .as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e)))
    .groupByKey.mapValues(v => v.toList)
    .cache

val finalValues = rdd.mapValues(v => v.head)
val initialValues = rdd.mapValues(v => v.last)
val valuesCombined = finalValues.join(initialValues)

val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1))

Is it good performance-wise or is there a better solution? I am not sure about caching the whole dataset in memory.

2 Answers2

2

groupByKey shuffles the data and the order of grouped value is no longer guaranteed. It is also rather expensive.

If you really want to operate on RDDs not DataFrames and ordering is based on the date you can use aggregateByKey:

import scala.math.Ordering

type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)

val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))

def minMax(x: (Record, Record), y: (Record, Record)) = {
  (RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}

rdd.aggregateByKey((maxRecord, minRecord))(
  (acc, x) => minMax(acc, (x, x)),
  minMax
)

With DataFrames you can try something like this:

import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window

val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")

val w = Window
  .partitionBy(partition.head, partition.tail: _*)
  .orderBy(order.head, order.tail: _*)

// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)

// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
  c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
  c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))

// Add columns with first / last vals
val expanded = df.select(
  partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)

// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)

There are some other ways you can solve this problem with DataFrames including ordering over structs and DataSet API. See my answer to SPARK DataFrame: select the first row of each group

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for Your input. I've done Datastax tutorial on Spark and it was based on RDDs and only mentioned DataFrames at the end do I assumed RDD is the way to go. Now after reading this [link](http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/) I know DataFrames are better performance-wise. I'll try to write the code using DataFrames. If You have time I'll be grateful for help writing it using DataFrames. – Paweł Szychiewicz Feb 13 '16 at 07:56
  • @PawełSzychiewicz I've linked another answer with some examples how you can solve first (last) row selection in group which could be more intuitive than window functions. – zero323 Feb 21 '16 at 13:29
1

First - I'm assuming the all variable refers to the one named rdd? After creating it, you don't need to use join (which is costly performance-wise), you can simply map each element directly to the result you need:

val results = all.mapValues(v => v.head - v.last).values

Now - since we've only performed a single action on the RDD, we can also get rid of the cache().

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • This won't work. `head` and `last` can be arbitrary elements especially since `groupByKey` disables map side aggregation. If you want to `mapValues` you should enforce the order first. – zero323 Feb 21 '16 at 13:23