12

This is my DataFrame in PySpark:

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   5       A
2015-10-13 13:00:00+00:00   6       A
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   11      B

The values of data are cumulative.

I want to get this result (differences between consecutive rows, grouped by feed):

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   4       A
2015-10-13 13:00:00+00:00   1       A  
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   1       B

In pandas I would do it this way:

df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))

How can I do the same thing in PySpark?

Fluxy
  • 2,838
  • 6
  • 34
  • 63

2 Answers2

20

You can do this using lag function with a window:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

window = Window.partitionBy("feed").orderBy("utc_timestamp")

df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
elyptikus
  • 936
  • 8
  • 24
Shadowtrooper
  • 1,372
  • 15
  • 28
9

You can use lag as a substitute for shift, and coalesce( , F.lit(0)) as a substitute for fill_value=0

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("feed").orderBy("utc_timestamp")

data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)
mck
  • 40,932
  • 13
  • 35
  • 50