1

Give following code

case class Contact(name: String, phone: String)
case class Person(name: String, ts:Long, contacts: Seq[Contact])

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val people = sqlContext.read.format("orc").load("people")

What is the best way to dedupe users by its timestamp So the user with max ts will stay at collection? In spark using RDD I would run something like this

rdd.reduceByKey(_ maxTS _) 

and would add the maxTS method to Person or add implicits ...

def maxTS(that: Person):Person =
that.ts > ts match {
  case true => that
  case false => this
}

Is it possible to do the same at DataFrames? and will that be the similar performance? We are using spark 1.6

Julias
  • 5,752
  • 17
  • 59
  • 84

2 Answers2

2

You can use Window functions, I'm assuming that the key is name:

import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = // convert to DataFrame
val win = Window.partitionBy('name).orderBy('ts.desc)
df.withColumn("personRank", rowNumber.over(win))
  .where('personRank === 1).drop("personRank")

For each person it will create personRank - each person with given name will have unique number, person with the latest ts will have the lowest rank, equal to 1. The you drop temporary rank

T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • Answer inspired by this answer by Maciej: http://stackoverflow.com/questions/33878370/spark-dataframe-select-the-first-row-of-each-group?rq=1 Please upvote that answer if you are upvoting mine ;) – T. Gawęda Mar 22 '17 at 10:18
  • Thanks a lot. But what is the performance of that? it should first shuffle all data, than per each key order the duplicates ...? or it has some optimizations underground? – Julias Mar 22 '17 at 10:29
  • @Julias It will do only one exchange. according to `explain()` – T. Gawęda Mar 22 '17 at 10:33
0

You can do a groupBy and use your preferred aggregation method like sum, max etc.

df.groupBy($"name").agg(sum($"tx").alias("maxTS"))
thebluephantom
  • 16,458
  • 8
  • 40
  • 83