32

I am using pySpark, and have set up my dataframe with two columns representing a daily asset price as follows:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])

I get upon applying df.show():

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+

Which is fine and all. I would like to have another column that contains the day-to-day returns of the price column, i.e., something like

(price(day2)-price(day1))/(price(day1))

After much research, I am told that this is most efficiently accomplished through applying the pyspark.sql.window functions, but I am unable to see how.

pault
  • 41,343
  • 15
  • 107
  • 149
Thomas Moore
  • 941
  • 2
  • 11
  • 17
  • I assume sqlCtx is the equivalent to 'spark' object which is obtained using sc = SparkContext('local') spark = SparkSession(sc) – Nir Sep 26 '18 at 10:04

2 Answers2

59

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this:

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn('user', lit('tmoore'))

df_lag = dfu.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                 .over(Window.partitionBy("user")))

result = df_lag.withColumn('daily_return', 
          (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )

>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+---+-----+-------+--------------+--------------------+

Here is longer introduction into Window functions in Spark.

Derek O
  • 16,770
  • 4
  • 24
  • 43
Oleksiy
  • 6,337
  • 5
  • 41
  • 58
  • 1
    Hi. Thanks! That is very useful. By the way, what does the "lit" function do? – Thomas Moore Apr 20 '16 at 23:27
  • 2
    `lit` - Creates a Column of literal value - https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit – Oleksiy Apr 21 '16 at 14:16
  • 11
    minor note. it's also good practice to sort the column that lag applies to, e.g. Window.partitionBy("user").orderBy("day", ascending=True) – Quetzalcoatl Aug 28 '18 at 06:25
  • 1
    Evaluating df_lag, I get an error: Window function lag(price#66, 1, null) requires window to be ordered, dfu.withColumn('prev_day_price',func.lag(dfu['price']).over(Window.orderBy("user"))) solves this – Nir Sep 26 '18 at 10:44
  • How this can be achieved using spark structured streaming? – Nagesh May 05 '19 at 07:33
  • Can any one suggest how this can be implemented using Spark Streaming? – Nagesh May 06 '19 at 05:53
6

Lag function can help you resolve your use case.

from pyspark.sql.window import Window
import pyspark.sql.functions as func

### Defining the window 
Windowspec=Window.orderBy("day")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                .over(Windowspec))

### Calculating the average                                  
result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['price'] - prev_day_price['prev_day_price']) / 
prev_day_price['price'] )
Sushmita Konar
  • 171
  • 2
  • 4