12

I have a dataframe:

DF:

1,2016-10-12 18:24:25
1,2016-11-18 14:47:05
2,2016-10-12 21:24:25
2,2016-10-12 20:24:25
2,2016-10-12 22:24:25
3,2016-10-12 17:24:25

How to keep only latest record for each group? (there are 3 groups above (1,2,3)).

Result should be:

1,2016-11-18 14:47:05
2,2016-10-12 22:24:25
3,2016-10-12 17:24:25

Trying also to make it efficient (e.g. to finish within few short minutes on a moderate cluster with 100 million records), so sorting/ordering should be done (if they are required) in most efficient and correct manner..

Joe
  • 11,983
  • 31
  • 109
  • 183
  • 1
    Does this answer your question? [How to select the first row of each group?](https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group) – user10938362 Jan 23 '20 at 20:54

2 Answers2

19

You have to use the window function.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

you have to partition the window by the group and OrderBy time, below pyspark script do the work

from pyspark.sql.functions import *
from pyspark.sql.window import Window

schema = "Group int,time timestamp "

df = spark.read.format('csv').schema(schema).options(header=False).load('/FileStore/tables/Group_window.txt')


w = Window.partitionBy('Group').orderBy(desc('time'))
df = df.withColumn('Rank',dense_rank().over(w))

df.filter(df.Rank == 1).drop(df.Rank).show()


+-----+-------------------+
|Group|               time|
+-----+-------------------+
|    1|2016-11-18 14:47:05|
|    3|2016-10-12 17:24:25|
|    2|2016-10-12 22:24:25|
+-----+-------------------+ ```





Ravi
  • 592
  • 3
  • 11
  • 7
    you should use row_number() instead of dense_rank() because dense_rank() gives the same rank to 'tied' rows – Roberto A. Jan 09 '22 at 10:29
0

You can use window functions as described here for cases like this:

scala> val in = Seq((1,"2016-10-12 18:24:25"),
     | (1,"2016-11-18 14:47:05"),
     | (2,"2016-10-12 21:24:25"),
     | (2,"2016-10-12 20:24:25"),
     | (2,"2016-10-12 22:24:25"),
     | (3,"2016-10-12 17:24:25")).toDF("id", "ts")
in: org.apache.spark.sql.DataFrame = [id: int, ts: string]
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win = Window.partitionBy("id").orderBy("ts desc")
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@59fa04f7

scala> in.withColumn("rank", row_number().over(win)).where("rank == 1").show(false)
+---+-------------------+----+
| id|                 ts|rank|
+---+-------------------+----+
|  1|2016-11-18 14:47:05|   1|
|  3|2016-10-12 17:24:25|   1|
|  2|2016-10-12 22:24:25|   1|
+---+-------------------+----+
nirmal
  • 107
  • 1
  • 1
  • 8
Charlie Flowers
  • 1,287
  • 7
  • 12