0

I have this DataFrame:

DataFrame[date: string, t: string, week: string, a: bigint, b: bigint]

With the following data:

+---------+--+--------+---+---+
|date     |t |week    |a  |b  |
+---------+--+--------+---+---+
|20180328 |1 |2018-W10|31 |35 |
|20180328 |1 |2018-W11|18 |37 |
|20180328 |1 |2018-W12|19 |37 |
|20180328 |1 |2018-W13|19 |38 |
|20180328 |1 |2018-W14|20 |38 |
|20180328 |1 |2018-W15|22 |39 |
|20180328 |1 |2018-W16|23 |39 |
|20180328 |1 |2018-W17|24 |40 |
|20180328 |1 |2018-W18|25 |40 |
|20180328 |1 |2018-W19|25 |41 |
|20180328 |1 |2018-W20|26 |41 |
|20180328 |1 |2018-W21|26 |41 |
|20180328 |1 |2018-W22|26 |41 |
|20180328 |2 |2018-W10|14 |26 |
|20180328 |2 |2018-W11|82 |33 |
|20180328 |2 |2018-W12|87 |36 |
|20180328 |2 |2018-W13|89 |39 |
|20180328 |2 |2018-W14|10 |45 |
|20180328 |2 |2018-W15|10 |45 |
|20180328 |2 |2018-W16|11 |48 |
|20180328 |2 |2018-W17|11 |55 |
|20180328 |2 |2018-W18|11 |60 |
|20180328 |2 |2018-W19|11 |70 |
|20180328 |2 |2018-W20|11 |79 |
|20180328 |2 |2018-W21|11 |86 |
|20180328 |2 |2018-W22|12 |93 |
+---------+--+--------+---+---+

And I want to add a new column that has, for each date and type (column t), the difference between that row and the first week for that date for column b.

Something like this:

+---------+--+--------+---+---+---+
|date     |t |week    |a  |b  |h  |
+---------+--+--------+---+---+---+
|20180328 |1 |2018-W10|31 |35 |0  | 
|20180328 |1 |2018-W11|18 |37 |2  |
|20180328 |1 |2018-W12|19 |37 |2  |
|20180328 |1 |2018-W13|19 |38 |3  |
|20180328 |1 |2018-W14|20 |38 |3  |
|20180328 |1 |2018-W15|22 |39 |4  |
|20180328 |1 |2018-W16|23 |39 |4  |
|20180328 |1 |2018-W17|24 |40 |5  |
|20180328 |1 |2018-W18|25 |40 |5  |
|20180328 |1 |2018-W19|25 |41 |6  |
|20180328 |1 |2018-W20|26 |41 |6  |
|20180328 |1 |2018-W21|26 |41 |6  | 
|20180328 |1 |2018-W22|26 |41 |6  | 
|20180328 |2 |2018-W10|14 |26 |0  | 
|20180328 |2 |2018-W11|82 |33 |7  | 
|20180328 |2 |2018-W12|87 |36 |10 | 
|20180328 |2 |2018-W13|89 |39 |13 | 
|20180328 |2 |2018-W14|10 |45 |19 | 
|20180328 |2 |2018-W15|10 |45 |19 | 
|20180328 |2 |2018-W16|11 |48 |22 | 
|20180328 |2 |2018-W17|11 |55 |29 | 
|20180328 |2 |2018-W18|11 |60 |34 | 
|20180328 |2 |2018-W19|11 |70 |44 | 
|20180328 |2 |2018-W20|11 |79 |53 | 
|20180328 |2 |2018-W21|11 |86 |60 | 
|20180328 |2 |2018-W22|12 |93 |67 | 
+---------+--+--------+---+---+---+

Each number in column h is the value in col('b') - value in col('b') at W10 for that type.

pault
  • 41,343
  • 15
  • 107
  • 149
frm
  • 657
  • 4
  • 9
  • 22

1 Answers1

2

You can accomplish this using a pyspark.sql.Window.

Partition by the column 't' and order by the column 'week'. This works because sorting your week column will do a lexicographical sort, and 'W10' will be the first value for your group. If this were not the case, you would need to find another way to sort the column so that the order is what you want.

Here is a trimmed down example.

data = [
    ('20180328',1,'2018-W10',31,35),
    ('20180328',1,'2018-W11',18,37),
    ('20180328',1,'2018-W12',19,37),
    ('20180328',1,'2018-W13',19,38),
    ('20180328',1,'2018-W14',20,38),
    ('20180328',2,'2018-W10',14,26),
    ('20180328',2,'2018-W11',82,33),
    ('20180328',2,'2018-W12',87,36),
    ('20180328',2,'2018-W13',89,39)
]

df = sqlCtx.createDataFrame(data, ['date', 't', 'week', 'a', 'b'])
df.show()
#+--------+---+--------+---+---+
#|    date|  t|    week|  a|  b|
#+--------+---+--------+---+---+
#|20180328|  1|2018-W10| 31| 35|
#|20180328|  1|2018-W11| 18| 37|
#|20180328|  1|2018-W12| 19| 37|
#|20180328|  1|2018-W13| 19| 38|
#|20180328|  1|2018-W14| 20| 38|
#|20180328|  2|2018-W10| 14| 26|
#|20180328|  2|2018-W11| 82| 33|
#|20180328|  2|2018-W12| 87| 36|
#|20180328|  2|2018-W13| 89| 39|
#+--------+---+--------+---+---+

Using pyspark DataFrame functions

Define the Window:

from pyspark.sql import Window   
w = Window.partitionBy('t').orderBy('week')

Create the new column using the Window:

import pyspark.sql.functions as f

df = df.select('*', (f.col('b') - f.first('b').over(w)).alias('h'))
df.show()
#+--------+---+--------+---+---+---+
#|    date|  t|    week|  a|  b|  h|
#+--------+---+--------+---+---+---+
#|20180328|  1|2018-W10| 31| 35|  0|
#|20180328|  1|2018-W11| 18| 37|  2|
#|20180328|  1|2018-W12| 19| 37|  2|
#|20180328|  1|2018-W13| 19| 38|  3|
#|20180328|  1|2018-W14| 20| 38|  3|
#|20180328|  2|2018-W10| 14| 26|  0|
#|20180328|  2|2018-W11| 82| 33|  7|
#|20180328|  2|2018-W12| 87| 36| 10|
#|20180328|  2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+

Using pyspark-sql

Here is the equivalent operation using pyspark-sql:

df.registerTempTable('myTable')
df = sqlCtx.sql(
    "SELECT *, (b - FIRST(b) OVER (PARTITION BY t ORDER BY week)) AS h FROM myTable"
)
df.show()
#+--------+---+--------+---+---+---+
#|    date|  t|    week|  a|  b|  h|
#+--------+---+--------+---+---+---+
#|20180328|  1|2018-W10| 31| 35|  0|
#|20180328|  1|2018-W11| 18| 37|  2|
#|20180328|  1|2018-W12| 19| 37|  2|
#|20180328|  1|2018-W13| 19| 38|  3|
#|20180328|  1|2018-W14| 20| 38|  3|
#|20180328|  2|2018-W10| 14| 26|  0|
#|20180328|  2|2018-W11| 82| 33|  7|
#|20180328|  2|2018-W12| 87| 36| 10|
#|20180328|  2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+

Related

pault
  • 41,343
  • 15
  • 107
  • 149