5

I am doing group by action in spark sql.In that some rows contain same value with different ID.In that case I want to select first row.

This is my code.

    val highvalueresult = highvalue.select($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID",  $"RSSI_Weight_avg")
                          .groupBy("tagShortID", "Timestamp").agg(max($"RSSI_Weight_avg")
                          .alias("RSSI_Weight_avg"))

        val t2 = averageDF.join(highvalueresult, Seq("tagShortID", "Timestamp", "RSSI_Weight_avg"))

And this is my result.

tag,timestamp,rssi,listner,rootorg,suborg
2,1496745906,0.7,3878,4,3
4,1496745907,0.6,362,4,3
4,1496745907,0.6,718,4,3
4,1496745907,0.6,1901,4,3

In the above result for the time stamp 1496745907 same rssi values for three listner.In this case I want to select the first row.

sathya
  • 1,982
  • 1
  • 20
  • 37
Jessi joseph
  • 191
  • 3
  • 10

1 Answers1

14

You can use the windowing functions support that spark sql context has Assuming you dataframe is:

+---+----------+----+-------+-------+------+
|tag| timestamp|rssi|listner|rootorg|suborg|
+---+----------+----+-------+-------+------+
|  2|1496745906| 0.7|   3878|      4|     3|
|  4|1496745907| 0.6|    362|      4|     3|
|  4|1496745907| 0.6|    718|      4|     3|
|  4|1496745907| 0.6|   1901|      4|     3|
+---+----------+----+-------+-------+------+

Define a window function as(you can partition by/order by your columns):

val window = Window.partitionBy("timestamp", "rssi").orderBy("timestamp")

Apply the window function:

res1.withColumn("rank", row_number().over(window))
+---+----------+----+-------+-------+------+----+
|tag| timestamp|rssi|listner|rootorg|suborg|rank|
+---+----------+----+-------+-------+------+----+
|  4|1496745907| 0.6|    362|      4|     3|   1|
|  4|1496745907| 0.6|    718|      4|     3|   2|
|  4|1496745907| 0.6|   1901|      4|     3|   3|
|  2|1496745906| 0.7|   3878|      4|     3|   1|
+---+----------+----+-------+-------+------+----+

Select the first rows from each window

    res5.where($"rank" === 1)
+---+----------+----+-------+-------+------+----+
|tag| timestamp|rssi|listner|rootorg|suborg|rank|
+---+----------+----+-------+-------+------+----+
|  4|1496745907| 0.6|    362|      4|     3|   1|
|  2|1496745906| 0.7|   3878|      4|     3|   1|
+---+----------+----+-------+-------+------+----+
dumitru
  • 2,068
  • 14
  • 23