4

Having a dataframe like:

## +---+---+
## | id|num|
## +---+---+
## |  2|3.0|
## |  3|6.0|
## |  3|2.0|
## |  3|1.0|
## |  2|9.0|
## |  4|7.0|
## +---+---+

and I want to remove the consecutive repetitions, and obtain:

## +---+---+
## | id|num|
## +---+---+
## |  2|3.0|
## |  3|6.0|
## |  2|9.0|
## |  4|7.0|
## +---+---+

I found ways of doing this in Pandas but nothing in Pyspark.

Qubix
  • 4,161
  • 7
  • 36
  • 73
  • 1
    By 'consecutive' I am guessing according to a certain order, such as by `num`. Is this correct, or do you also want this for a distribution of ids like `[1,2,1,1,2]` resulting in `[1,2,1,2]`? – martinarroyo Sep 03 '18 at 10:07
  • 1
    The order should be given by ids, so your example is correct. [1, 2, 1, 1, 2] should result in [1, 2, 1, 2]. – Qubix Sep 03 '18 at 10:53
  • spark shuffles the record when it fetch from the data source so how you will ensure the order means which column to refer if i have to give a row number – Ankit Kumar Namdeo Sep 03 '18 at 13:40

1 Answers1

5

The answer should work as you desired, however there might be room for some optimization:

from pyspark.sql.window import Window as W
test_df = spark.createDataFrame([
    (2,3.0),(3,6.0),(3,2.0),(3,1.0),(2,9.0),(4,7.0)
    ], ("id", "num"))
test_df = test_df.withColumn("idx", monotonically_increasing_id())  # create temporary ID because window needs an ordered structure
w = W.orderBy("idx")
get_last= when(lag("id", 1).over(w) == col("id"), False).otherwise(True) # check if the previous row contains the same id

test_df.withColumn("changed",get_last).filter(col("changed")).select("id","num").show() # only select the rows with a changed ID

Output:

+---+---+
| id|num|
+---+---+
|  2|3.0|
|  3|6.0|
|  2|9.0|
|  4|7.0|
+---+---+
plalanne
  • 1,010
  • 2
  • 13
  • 30
gaw
  • 1,960
  • 2
  • 14
  • 18