3

I need to implement the lag function in spark; which I was able to do like below (with some data from hive/temp spark table)

Say the DF has these rows:

lagno:value
0, 100
0, 200
2, null
3, null

where the first column is the actual lag number which you want to use, and the second column is actual value.

When I run this query it works:

DataFrame df;
DataFrame dfnew=df.select(
            org.apache.spark.sql.functions.lag( df.col("value"), 1 ).over(org.apache.spark.sql.expressions.Window.orderBy(new1.col("value"))));

that means if hard code the value of lag no, it works fine.

However, if I pass the lag value as a parameter it's not working:

DataFrame dfnew=df.select(
            org.apache.spark.sql.functions.lag( df.col("value"),df.col("lagno").over(org.apache.spark.sql.expressions.Window.orderBy(new1.col("value"))));

Do I need to type cast the parameter of col type to integer?

Brock Adams
  • 90,639
  • 22
  • 233
  • 295
kre
  • 51
  • 1
  • 6

1 Answers1

3

It is not possible. Window functions use fixed size frames that cannot be dynamically modified. You can compute lag for 1..3 and then select one required for the current row.

CASE 
  WHEN lagno = 1 THEN LAG(value,  1) OVER w 
  WHEN lagno = 2 THEN LAG(value,  2) OVER w 
  ...
  ELSE value
END
  • @LostlnOverflow..yeah.. i did the same with case ..since my lagno is expecting more than 50 i might end up writing 50 conditions..Is there any way a can have a UDF which shall take the lgno col value and returns integer.. but i am tentative if this can be possible. – kre Sep 16 '16 at 07:45