1

I have a dataframe like this...

+----------+-----+
|      date|price|
+----------+-----+
|2019-01-01|   25|
|2019-01-02|   22|
|2019-01-03|   20|
|2019-01-04|   -5|
|2019-01-05|   -1|
|2019-01-06|   -2|
|2019-01-07|    5|
|2019-01-08|  -11|
+----------+-----+

I want to create a new column based on a logic which needs to look back on other rows - not just the column values of the same row

I was trying some UDF but it takes the corresponding row value of the column. I do not know how to look at other rows...

With example: I would like to create a new column "newprice" - which will be something like this...

+----------+-----+----------+
|      date|price|new price
+----------+-----+----------+
|2019-01-01|   25| 25
|2019-01-02|   22| 22
|2019-01-03|   20| 20
|2019-01-04|   -5| 20
|2019-01-05|   -1| 20
|2019-01-06|   -2| 20
|2019-01-07|    5| 5
|2019-01-08|  -11| 5
+----------+-----+-----------+

Essentially every row in the new column value is based on not that corresponding row's values but other row's values...

Logic: If the price is negative then look back on previous days and if that day is positive value - take it or go back one more day until a positive value is available...

    dateprice = [('2019-01-01',25),('2019-01-02',22),('2019-01-03',20),('2019-01-04', -5),\
     ('2019-01-05',-1),('2019-01-06',-2),('2019-01-07',5),('2019-01-08', -11)]

   dataDF = sqlContext.createDataFrame(dateprice, ('date', 'price'))



Any help will be highly appreciated.

NDS
  • 119
  • 13

2 Answers2

0

First populate the new price column with the price column, but replace the negative values with nulls. Then you can use the technique shown on Fill in null with previously known good value with pyspark to get the last non-null value, which in this case will be the last positive value.

For example:

from pyspark.sql.functions import col, last, when
from pyspark.sql import Window

w = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

dataDF.withColumn("new_price", when(col("price") >= 0, col("price")))\
    .withColumn(
        "new_price",
        last('new_price', True).over(w)
    )\
    .show()
#+----------+-----+---------+
#|      date|price|new_price|
#+----------+-----+---------+
#|2019-01-01|   25|       25|
#|2019-01-02|   22|       22|
#|2019-01-03|   20|       20|
#|2019-01-04|   -5|       20|
#|2019-01-05|   -1|       20|
#|2019-01-06|   -2|       20|
#|2019-01-07|    5|        5|
#|2019-01-08|  -11|        5|
#+----------+-----+---------+

Here I have taken advantage of the fact that when returns null by default if the condition doesn't match and no otherwise is specified.

pault
  • 41,343
  • 15
  • 107
  • 149
  • BTW: I am reading about window stuff... but looking at this I am also trying to see if I can do something like replacing a row's value with value from 2 days ago - irrespective of negative or positive or null... just copying value from 2 days ago or rather n days ago. – NDS Aug 19 '19 at 21:35
  • Hi @pault, If now I want to do it within a name A or B ? How can I do this? +----------+---+-----+---------+ | date| name|price|new_price| +----------+---+-----+---------+ |2019-01-01| A| 25| 25| |2019-01-01| B| 2| 2| |2019-01-02| A| 22| 22| |2019-01-02| B| -1| 22| |2019-01-03| A| 20| 20| |2019-01-03| B| 10| 10| |2019-01-04| A| -5| 10| |2019-01-04| B| 7| 7| |2019-01-05| A| -1| 7| |2019-01-05| B| 0| 0| +----------+---+-----+---------+ – NDS Aug 23 '19 at 21:05
  • Hi @pault the new table is up in the main question end – NDS Aug 23 '19 at 21:13
  • w = Window.partitionBy("name").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow) – NDS Aug 23 '19 at 21:19
  • @SanDNath if you have a new question, either edit your question or post a new one. Don't post code in the comments. – pault Aug 26 '19 at 19:29
0

I tried this one using Spark SQL. Let me explain my solution in 2 parts,

First, when the price is negative, we can fetch the most recent date in which the price was positive otherwise we can populate the price itself, as shown below,

spark.sql("""
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date 
from dataset
""").show()

Output:

+----------+-----+-------------+
|      date|price|price_or_date|
+----------+-----+-------------+
|2019-01-01|   25|           25|
|2019-01-02|   22|           22|
|2019-01-03|   20|           20|
|2019-01-04|   -5|   2019-01-03|
|2019-01-05|   -1|   2019-01-03|
|2019-01-06|   -2|   2019-01-03|
|2019-01-07|    5|            5|
|2019-01-08|  -11|   2019-01-07|
+----------+-----+-------------+

Second, you can do a left join on the same dataset using the date and this derived column. So, now the ones with price in the price_or_date column would come out as null. Finally we can perform a simple coalesce on them.

Combining them, we can achieve this final query shown below to generate the desired output,

spark.sql("""
select 
   a.date
 , a.price
 , coalesce(b.price, a.price) as new_price
from
(
select *,
case when price < 0 then
max(lag(case when price < 0 then null else date end) over(order by date))
over(order by date rows between unbounded preceding and current row)
else price end as price_or_date 
from dataset
) a
left join dataset b
on a.price_or_date = b.date 
order by a.date""").show()

Output:

+----------+-----+---------+
|      date|price|new_price|
+----------+-----+---------+
|2019-01-01|   25|       25|
|2019-01-02|   22|       22|
|2019-01-03|   20|       20|
|2019-01-04|   -5|       20|
|2019-01-05|   -1|       20|
|2019-01-06|   -2|       20|
|2019-01-07|    5|        5|
|2019-01-08|  -11|        5|
+----------+-----+---------+

Hope this helps.

noufel13
  • 653
  • 4
  • 4