I have a dataframe looks like this:
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("Trigger", StringType(), True),\
StructField("value", FloatType(), True),\
StructField("col1", IntegerType(), True),
StructField("col2", IntegerType(), True),
StructField("want", FloatType(), True)])
TEST_data = [('2020-08-01','T',0.0,3,5,0.5),('2020-08-02','T',0.0,-1,4,0.0),('2020-08-03','T',0.0,-1,3,0.0),('2020-08-04','F',0.2,3,3,0.7),('2020-08-05','T',0.3,1,4,0.9),\
('2020-08-06','F',0.2,-1,3,0.0),('2020-08-07','T',0.2,-1,4,0.0),('2020-08-08','T',0.5,-1,5,0.0),('2020-08-09','T',0.0,-1,5,0.0)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show()
+----------+-------+-----+----+----+
| date|Trigger|value|col1|col2|
+----------+-------+-----+----+----+
|2020-08-01| T| 0.0| 3| 5|
|2020-08-02| T| 0.0| -1| 4|
|2020-08-03| T| 0.0| -1| 3|
|2020-08-04| F| 0.2| 3| 3|
|2020-08-05| T| 0.3| 1| 4|
|2020-08-06| F| 0.2| -1| 3|
|2020-08-07| T| 0.2| -1| 4|
|2020-08-08| T| 0.5| -1| 5|
|2020-08-09| T| 0.0| -1| 5|
+----------+-------+-----+----+----+
date
: sorted nicely
Trigger
: only T or F
value
: any random decimal (float) value
col1
: represents number of days and can not be lower than -1.** -1<= col1 < infinity**
col2
: represents number of days and cannot be negative. col2 >= 0
**Calculation logic **
If col1 == -1, then return 0
, otherwise if Trigger == T
, the following diagram will help to understand the logic.
If we look at "red color", +3 came from col1 which is col1==3
at 2020-08-01, what it means is that we jump 3 rows,and at the same time also take the difference (col2 - col1) -1 = ( 5-3) -1 = 1.
(at 2020-08-01) 1 represents summing the next value which is 0.2 + 0.3 = 0.5
. same logic apply for "blue color"
The "green color" is for when trigger == "F"
then just take (col2 -1)=3-1 =2
(2020-08-04), 2 represent sum of next two values. which is 0.2+0.3+0.2 = 0.7
Edit:
What if I want no conditions at all, let's say we have this df
TEST_schema = StructType([StructField("date", StringType(), True),\
StructField("value", FloatType(), True),\
StructField("col2", IntegerType(), True)])
TEST_data = [('2020-08-01',0.0,5),('2020-08-02',0.0,4),('2020-08-03',0.0,3),('2020-08-04',0.2,3),('2020-08-05',0.3,4),\
('2020-08-06',0.2,3),('2020-08-07',0.2,4),('2020-08-08',0.5,5),('2020-08-09',0.0,5)]
rdd3 = sc.parallelize(TEST_data)
TEST_df = sqlContext.createDataFrame(TEST_data, TEST_schema)
TEST_df = TEST_df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
TEST_df.show()
+----------+-----+----+
| date|value|col2|
+----------+-----+----+
|2020-08-01| 0.0| 5|
|2020-08-02| 0.0| 4|
|2020-08-03| 0.0| 3|
|2020-08-04| 0.2| 3|
|2020-08-05| 0.3| 4|
|2020-08-06| 0.2| 3|
|2020-08-07| 0.2| 4|
|2020-08-08| 0.5| 5|
|2020-08-09| 0.0| 5|
+----------+-----+----+
Same logic applies for when we had Trigger == "F" condition, so col2 -1
but no condition in this case.