1

I have a Dataset on groupby("_1","_2","_3","_4").agg(max("_5").as("time"),collect_list("_6").as("value")) returns a dataset which has the grouped data for four columns and max of time column and collect_list which has all the values for that grouped data like [5,1] but all I want for _6 is the value matching all the grouped columns and also max("_5").as("time") not only for the grouped columns

Code below:

val data = Seq(("thing1",1,1,"Temperature",1551501300000L,"5"),("thing1",1,1,"Temperature",1551502200000L,"1"))

 import org.apache.spark.sql.functions._
 val dataSet = spark.sparkContext.parallelize(data)
 import spark.implicits._
 val testDS = dataSet.toDS()
 testDS.groupby("_1","_2","_3","_4").agg(max("_5").as("time"),collect_list("_6").as("value")).show()

Output:

 |  _1     |  _2  |  _3  |  _4        |  time          |  value  |
 |thingId1 |  1   |  1   |Temperature |  1551502200000 | [5,1]   |

Required Output

 |  _1     |  _2  |  _3  |  _4        |  time          |  value  |
 |thingId1 |  1   |  1   |Temperature |  1551502200000 | 1       |

I don't want value 5 to be in the value column as its not comes under the criteria max("time") all I need is 1 in value column as the it only matches the conditions of all the grouped columns and max("time").

How to achieve this.

Thank you.

Pyd
  • 6,017
  • 18
  • 52
  • 109

2 Answers2

2

You can do this neatly and without having to use a Window function by using the argmax logic like so:

val data = Seq(("thing1",1,1,"Temperature",1551501300000L,"5"), 
               ("thing1",1,1,"Temperature",1551502200000L,"1")).toDF

data.groupBy("_1","_2","_3","_4").agg(
     max(struct("_5", "_6")).as("argmax")).select("_1","_2","_3","_4", "argmax.*").show

+------+---+---+-----------+-------------+---+
|    _1| _2| _3|         _4|           _5| _6|
+------+---+---+-----------+-------------+---+
|thing1|  1|  1|Temperature|1551502200000|  1|
+------+---+---+-----------+-------------+---+

When you use a max on a struct in spark it returns the struct with the highest first value and if there are structs with equal first values then it goes to the second value and so on and so forth. Once we have the max struct you can then extract the values from the struct by using the * wildcard.

randal25
  • 1,290
  • 13
  • 10
1

Use Window functions in this scenario:

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("_1","_2","_3","_4").orderBy(desc("_5"))

testDS.withColumn("rowSelector", row_number() over windowSpec)
    .where($"rowSelector" === 1)
    .drop($"rowSelector")
    .show(false) 

Output:

+------+---+---+-----------+-------------+---+
|_1    |_2 |_3 |_4         |_5           |_6 |
+------+---+---+-----------+-------------+---+
|thing1|1  |1  |Temperature|1551502200000|1  |  
1pluszara
  • 1,518
  • 3
  • 14
  • 26