In pyspark, I have a dataframe like below in which the rows are sorted based on id and the value of k1. In addition, each row has a unique ascending number assigned to it (rowid).
-----------------------
rowid | id | k1 | k2 |
-----------------------
1 | 1 | v1 | l1 |
2 | 1 | v1 | v1 |
3 | 1 | v1 | l2 |
4 | 2 | v2 | v2 |
5 | 2 | v2 | l3 |
6 | 3 | v3 | l3 |
----------------------
For every unique value of id, I want to compute the difference between the rowid of the first row in which k1==k2 and the rowid corresponding to the first row in which the record with the id is observed + 1, and store the results in a new column (i.e. rank). The output should look like below.
----------------
id | k1 |rank |
-----------------
1 | v1 | 2 |
2 | v2 | 1 |
3 | v3 | 0 |
-----------------
e.g., for id = 1, the value of k1==k2 when rowid= 2. The first time id=1 was observed is when rowid=1. Put 2-1+1=2 in rank column. For id =3, we do not have any record in which the value of columns k1 and k2 match. Therefore, fill the rank column with 0 (or null).
I assume this involves a groupBy based on id, but I am not sure how to get the index corresponding to the row in which columns k1 and k2 are matching and the first rowid corresponding to each unique id.