4

I want to set the value of column based on the value of that column in the previous row for a group. Then this updated value will be used in the next row.

I have the following dataframe

id | start_date|sort_date | A | B |
-----------------------------------
1 | 1/1/2017 | 31-01-2015 | 1 | 0 | 
1 | 1/1/2017 | 28-02-2015 | 0 | 0 | 
1 | 1/1/2017 | 31-03-2015 | 1 | 0 | 
1 | 1/1/2017 | 30-04-2015 | 1 | 0 | 
1 | 1/1/2017 | 31-05-2015 | 1 | 0 | 
1 | 1/1/2017 | 30-06-2015 | 1 | 0 | 
1 | 1/1/2017 | 31-07-2015 | 1 | 0 | 
1 | 1/1/2017 | 31-08-2015 | 1 | 0 | 
1 | 1/1/2017 | 30-09-2015 | 0 | 0 | 
2 | 1/1/2017 | 31-10-2015 | 1 | 0 | 
2 | 1/1/2017 | 30-11-2015 | 0 | 0 | 
2 | 1/1/2017 | 31-12-2015 | 1 | 0 | 
2 | 1/1/2017 | 31-01-2016 | 1 | 0 | 
2 | 1/1/2017 | 28-02-2016 | 1 | 0 | 
2 | 1/1/2017 | 31-03-2016 | 1 | 0 | 
2 | 1/1/2017 | 30-04-2016 | 1 | 0 | 
2 | 1/1/2017 | 31-05-2016 | 1 | 0 | 
2 | 1/1/2017 | 30-06-2016 | 0 | 0 | 

Output :

id | start_date|sort_date | A | B | C
---------------------------------------
1 | 1/1/2017 | 31-01-2015 | 1 | 0 | 1
1 | 1/1/2017 | 28-02-2015 | 0 | 0 | 0
1 | 1/1/2017 | 31-03-2015 | 1 | 0 | 1
1 | 1/1/2017 | 30-04-2015 | 1 | 0 | 2
1 | 1/1/2017 | 31-05-2015 | 1 | 0 | 3
1 | 1/1/2017 | 30-06-2015 | 1 | 0 | 4
1 | 1/1/2017 | 31-07-2015 | 1 | 0 | 5
1 | 1/1/2017 | 31-08-2015 | 1 | 0 | 6
1 | 1/1/2017 | 30-09-2015 | 0 | 0 | 0
2 | 1/1/2017 | 31-10-2015 | 1 | 0 | 1
2 | 1/1/2017 | 30-11-2015 | 0 | 0 | 0
2 | 1/1/2017 | 31-12-2015 | 1 | 0 | 1
2 | 1/1/2017 | 31-01-2016 | 1 | 0 | 2
2 | 1/1/2017 | 28-02-2016 | 1 | 0 | 3
2 | 1/1/2017 | 31-03-2016 | 1 | 0 | 4
2 | 1/1/2017 | 30-04-2016 | 1 | 0 | 5
2 | 1/1/2017 | 31-05-2016 | 1 | 0 | 6
2 | 1/1/2017 | 30-06-2016 | 0 | 0 | 0

Group is of id and date

Column C is to derived based on column A and B.

If A == 1 and B == 0 then C is derived C from previous row + 1.
There are some other conditions as well but I am struggling with this part.

Assuming we have a column sort_date in dataframe.

I tried the following query :

SELECT
id,
date,
sort_date,
lag(A) OVER (PARTITION BY  id, date ORDER BY sort_date) as prev,
CASE
   WHEN A=1 AND B= 0  THEN 1
   WHEN  A=1 AND B> 0 THEN prev +1
   ELSE 0
 END AS A
FROM
Table

This Is what I did for UDAF

val myFunc = new MyUDAF
val w = Window.partitionBy(col("ID"), col("START_DATE")).orderBy(col("SORT_DATE"))
val df = df.withColumn("C", myFunc(col("START_DATE"), col("X"),
  col("Y"), col("A"),
  col("B")).over(w))

P.S : I am using Spark 1.6

zero323
  • 322,348
  • 103
  • 959
  • 935
Shashi Mishra
  • 105
  • 2
  • 9
  • you can use **Window functions** with Spark SQL. – mrsrinivas Jan 30 '17 at 06:12
  • can you add the code which you have tried ? – mrsrinivas Jan 30 '17 at 06:18
  • Please improve the question: Can you explain a little more what you are trying to achieve, what you did so far, what is your input, what is your expected output, do you want to do this in RDD as the title says or in a dataframe as the wording of column suggest? what do you mean a group? do you mean a groupby? how do you want it sorted? – Assaf Mendelson Jan 30 '17 at 06:23
  • Sorry for the formatting I was doing this from my phone. I have edited the question. I am working with dataframes. There is one more date column which is used to order the rows in groups. – Shashi Mishra Jan 30 '17 at 06:42
  • It's also good to add what you have tried so far. And also to answer the other comments as your question is quite broad and also subject to be closed. – eliasah Jan 30 '17 at 07:18
  • how is the sorting defined? – Assaf Mendelson Jan 30 '17 at 08:25
  • @AssafMendelson : Sorting is in ascending order. – Shashi Mishra Jan 30 '17 at 10:39
  • sort_date is the column, so we group by Id and date and sort by sort_date inside that group – Shashi Mishra Jan 30 '17 at 13:12

1 Answers1

4

First define a window:

import org.apache.spark.sql.expressions.Window
val winspec = Window.partitionBy("id","start_date").orderBy("sort_date")

Next create a UDAF which recieves A and B and basically calculates C by starting with 0, changing to 0 whenever the condition appears (A=1,B=0) and increasing by 1 any other time. To see how to write a UDAF see examples in here, here and here

EDIT Here is a sample implementation of the UDAF (not really tested so there may be typos):

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._

 class myFunc() extends UserDefinedAggregateFunction {

  // Input Data Type Schema
  def inputSchema: StructType = StructType(Array(StructField("A", IntegerType), StructField("A", IntegerType)))

   // Intermediate Schema
  def bufferSchema = StructType(Array(StructField("C", IntegerType)))

  // Returned Data Type .
  def dataType: DataType = IntegerType

  // Self-explaining
  def deterministic = true

  // This function is called whenever key changes
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = 0 // set number of items to 0
  }

  // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) = if (input.getInt(0) == 1 && input.getInt(1) == 0) buffer.getInt(0) + 1 else 0
  }

  // Merge two partial aggregates - doesn't really matter because the window will make sure the buffer remains in a
  // single partition
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0)
  }

  // Called after all the entries are exhausted.
  def evaluate(buffer: Row) = {
    buffer.getInt(0)
  }

}

Last apply it to your dataframe. Let's assume you named your UDAF myFunc:

val f = new myFunc()
val newDF = df.withColumn("newC", f($"A",$"B").over(winspec))
Community
  • 1
  • 1
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • C is the derived column. Initially every row has C=0. I need to calculate the C of the current row based on the calculated value of C from previous row. In this case the previous C will always be 0. – Shashi Mishra Jan 31 '17 at 03:59
  • I have changed my question to reflect that. it was my mistake. – Shashi Mishra Jan 31 '17 at 04:08
  • I just called it here newC. The part which creates the calculation is the UDAF. The UDAF would start newC with 0 and increase it by 1 except when A=1 and B=0. What the window does is make sure the input to the UDAF is partitioned and ordered correctly – Assaf Mendelson Jan 31 '17 at 05:38
  • How can I increment the newC. will the previous value of newC be available for the next row? – Shashi Mishra Jan 31 '17 at 05:45
  • def myFunc(A,B): if A == 1 and B == 0: return 1 else: increment previous value by 1 () # just an example – Shashi Mishra Jan 31 '17 at 05:45
  • You should be implementing UDAF as in the examples I showed – Assaf Mendelson Jan 31 '17 at 07:26
  • Added sample UDAF implementation – Assaf Mendelson Jan 31 '17 at 07:59
  • Hi @AssafMendelson , I am getting the following error. Exception in thread "main" java.lang.UnsupportedOperationException: ForbEndUDAF('START_DATE,'ENTRY_CYCLE,'FIRST_RN,'PAYMENT_DONE,'PREV_ARREAR) is not supported in a window operation. – Shashi Mishra Feb 01 '17 at 05:35
  • Can you expand on what you did differently? – Assaf Mendelson Feb 01 '17 at 08:46
  • I have edited the question with my calling of the UDAF. I am using Spark 1.6 and I read somewhere that custom UDAF is not suported for window functions in Spark 1.6. Is that the case? – Shashi Mishra Feb 01 '17 at 11:05
  • You are correct. According to https://forums.databricks.com/questions/6466/spark-16-using-udaf-with-windowed-functions.html this is not supported for spark 1.6. I tested the code in the answer on spark 2.1 to make sure it works. – Assaf Mendelson Feb 01 '17 at 11:11
  • They are two other other columns in my real dataset which effect the value of C. So I changed the update function and input schema to reflect that in my UDAF. – Shashi Mishra Feb 01 '17 at 11:17
  • If you are forced to using spark 1.6, then I see two options: 1. go to RDD and use custom aggregation. 2. create a struct type for each id + date, do a collect_list on it, do a UDF which does the calculation and explode it back – Assaf Mendelson Feb 01 '17 at 11:20