0

Env: Spark 1.6, Scala

Hi
I have records in Dataframe like bellow

reportTime               serverNAme     channel     viewer  
2017-01-12 19:16:58.76  MTRL28DSERV722  Channel1    1192
2017-01-12 19:16:59.213 MTRL28DSERV722  Channel1    668
2017-01-12 19:17:05.193 BBBN23DSERV000  Channel1    795
2017-01-12 19:17:01.15  BBBN23DSERV000  Channel1    700

2017-01-12 19:16:58.76  MTRL28DSERV722  Channel3    100
2017-01-12 19:16:59.213 MTRL28DSERV722  Channel3    110
2017-01-12 19:17:05.193 BBBN23DSERV000  Channel3    200
2017-01-12 19:17:01.15  BBBN23DSERV000  Channel3    50

I need output : latest viewers from each server by channel- so that

2017-01-12 19:16:59.213 MTRL28DSERV722  Channel1    668
2017-01-12 19:17:05.193 BBBN23DSERV000  Channel1    795


2017-01-12 19:16:59.213 MTRL28DSERV722  Channel3    100
2017-01-12 19:17:05.193 BBBN23DSERV000  Channel3    200

How can I get result ? I don't want to register DF as temptable and use SQL on it as SQL is not optimized in Spark 1.6.
Thanks
Hossain

Jhon
  • 137
  • 1
  • 4
  • 14

1 Answers1

2

You can create a row_number within each combination of severName and channel, in descending order of reportTime, and then use where to filter the first row within each group which should be the one with the most recent reportTime:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number    
val w = Window.partitionBy($"serverNAme", $"channel").orderBy($"reportTime".desc)

df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop($"rn").show

+--------------------+--------------+--------+------+
|          reportTime|    serverNAme| channel|viewer|
+--------------------+--------------+--------+------+
|2017-01-12 19:17:...|BBBN23DSERV000|Channel3|   200|
|2017-01-12 19:16:...|MTRL28DSERV722|Channel3|   110|
|2017-01-12 19:17:...|BBBN23DSERV000|Channel1|   795|
|2017-01-12 19:16:...|MTRL28DSERV722|Channel1|   668|

+--------------------+--------------+--------+------+
Psidom
  • 209,562
  • 33
  • 339
  • 356