2

I have a data frame which has a column with epoch seconds.
In addition to this I would like to add a column which contains the difference between current and previous time value - in other words time diff since last row in the data frame based on the timestamp column.

How would I add such a column based on earlier values?

I am using the Scala API.

Community
  • 1
  • 1
LK__
  • 6,515
  • 5
  • 34
  • 53
  • Do you group / partition the data? – zero323 Mar 17 '16 at 21:34
  • Not so far - just listed all rows from a log and want to chart some measurements based on time – LK__ Mar 17 '16 at 21:38
  • If you don't group data frames won't work for you (I mean you can but you really don't want to). Use sliding instead: http://stackoverflow.com/a/32679114/1560062. If you decide to group you can use window functions as shown for example here: http://stackoverflow.com/q/34535833/1560062 – zero323 Mar 17 '16 at 21:40

3 Answers3

1

you can use the lag function of spark to achieve this

val df = sc.parallelize(Seq(
  (1540000005),
  (1540000004),
  (1540000003),
  (1540000002))).toDF("epoch")

// a lag function needs to have a window
val w = org.apache.spark.sql.expressions.Window.orderBy("epoch")  

import org.apache.spark.sql.functions.lag
// create a column epoch_lag_1 which is the epoch column with an offset of 1 and default value 0
val dfWithLag = df.withColumn("epoch_lag_1", lag("epoch", 1, 0).over(w))

// calculate the diff between epoch and epoch_lag_1
val dfWithDiff = dfWithLag.withColumn("diff", dfWithLag("epoch") - dfWithLag("epoch_lag_1"))

this should result in

dfWithDiff.show 


+----------+-----------+----------+                                                                                     
|     epoch|epoch_lag_1|      diff|                                                                                     
+----------+-----------+----------+                                                                                     
|1540000002|          0|1540000002|                                                                                     
|1540000003| 1540000002|         1|                                                                                     
|1540000004| 1540000003|         1|                                                                                     
|1540000005| 1540000004|         1|                                                                                     
+----------+-----------+----------+ 
Vincent Claes
  • 3,960
  • 3
  • 44
  • 62
0

This will do what you want, though as pointed out it could be a little slow.

df.printSchema
root
 |-- ts: long (nullable = false)

df.join(
  df.toDF("ts2"),
  $"ts2" < $"ts",
  "left_outer"
).groupBy($"ts").agg(max($"ts2") as "prev").select($"ts", $"ts" - $"prev" as "diff").show

We can even use my pimped out DataFrame-ified zipWithIndex to make it better. Assuming we used that to add an id column, you could do:

df.join(
  df.toDF("prev_id", "prev_ts"), 
  $"id" === $"prev_id" + 1, 
  "left_outer"
).select($"ts", $"ts" - $"prev_ts" as "diff").show
Community
  • 1
  • 1
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • A little is a serious understatement :) This requires a full Cartesian product. – zero323 Mar 17 '16 at 22:36
  • I didn't see anything in the question about performance. :) I mean, I can get all sorts of stupid and do a zipWithIndex -- http://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex -- and do the join on the index column with `$"id" === $"id" - 1` -- is that any better? – David Griffin Mar 17 '16 at 22:43
  • Actually, it is __much__ better :) _O(N)_ vs _O(N^2)_ better.to be precise. But if you drop to RDDs there is no good reason to move back to DF :( – zero323 Mar 17 '16 at 22:48
  • Except I try to stay away from RDDs. I have a hard enough time making DataFrames do what I want , and I am happy to let the optimizer do its thing. Not all of us want to learn every inch of Spark – David Griffin Mar 17 '16 at 23:01
  • You could try to use (and bound) uinque identifier but it is tricky. – zero323 Mar 17 '16 at 23:04
  • I'm kind of surprised there is no DataFrame native zipWithIndex actually. – David Griffin Mar 17 '16 at 23:10
  • It is a tricky design problem. You could make it deterministic but introduces a breakpoint in the execution plan. Non-deterministic doesn't add anything over unique id. And it is not that useful in practice considering Spark ordering semantics. – zero323 Mar 17 '16 at 23:15
  • A unique identifier wouldn't cut it here though. You need the actual ROWID. – David Griffin Mar 17 '16 at 23:25
  • It should. These are strictly increasing and there are clear rules how these change with parturition. But it is well belove high level API. – zero323 Mar 17 '16 at 23:33
0

I do not know Scala. But how about generating a lagged column with lag and then subtracting one column from the other?

akoeltringer
  • 1,671
  • 3
  • 19
  • 34