5

In pyspark, I have a dataframe like below in which the rows are sorted based on id and the value of k1. In addition, each row has a unique ascending number assigned to it (rowid).

-----------------------
rowid | id | k1  | k2 |
-----------------------
1     | 1  | v1 | l1  |
2     | 1  | v1 | v1  |
3     | 1  | v1 | l2  |
4     | 2  | v2 | v2  |
5     | 2  | v2 | l3  |
6     | 3  | v3 | l3  |
----------------------

For every unique value of id, I want to compute the difference between the rowid of the first row in which k1==k2 and the rowid corresponding to the first row in which the record with the id is observed + 1, and store the results in a new column (i.e. rank). The output should look like below.

----------------
 id | k1  |rank |
-----------------
 1  | v1  | 2   |
 2  | v2  | 1   |
 3  | v3  | 0   | 
-----------------

e.g., for id = 1, the value of k1==k2 when rowid= 2. The first time id=1 was observed is when rowid=1. Put 2-1+1=2 in rank column. For id =3, we do not have any record in which the value of columns k1 and k2 match. Therefore, fill the rank column with 0 (or null).

I assume this involves a groupBy based on id, but I am not sure how to get the index corresponding to the row in which columns k1 and k2 are matching and the first rowid corresponding to each unique id.

user3192082
  • 337
  • 2
  • 12

2 Answers2

1

First creating a sample dataframe,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame([
            (1, 1, 'v1' , 'l1'),
            (2, 1, 'v1' , 'v1'),
            (3, 1, 'v1' , 'l2'),
            (4, 2, 'v2' , 'v2'),
            (5, 2, 'v2' , 'l3'),
            (6, 3, 'v3' , 'l3'),
            ],[
            'rowid', 'id', 'k1', 'k2'])

Then creating a udf and applying it on the columns,

def get_rank_udf(rows):
    rows = sorted(rows, key=lambda x: x['rowid'])
    first_row_id = rows[0]['rowid']
    for _r in rows:
        if _r['k1'] == _r['k2']:
            equal_row_id = _r['rowid']
            break
        else:
            equal_row_id = None

    if equal_row_id is None:
        return 0
    return equal_row_id - first_row_id + 1

get_rank = F.udf(lambda x: get_rank_udf(x), IntegerType())

df = df.groupby('id', 'k1').agg(F.collect_list(F.struct('rowid', 'k1', 'k2')).alias('elements'))\
       .withColumn('rank', get_rank(F.col('elements')))\
       .select('id', 'k1', 'rank')

This gives the output,

+---+---+----+                                                                  
| id| k1|rank|
+---+---+----+
|  1| v1|   2|
|  2| v2|   1|
|  3| v3|   0|
+---+---+----+
mayank agrawal
  • 2,495
  • 2
  • 13
  • 32
  • Thanks, I was also able to solve this as follow, but your solution is more elegant :) Do you know which one is more efficient? I mean join vs udf? `df2 = df.groupBy("id").agg(fn.min("rowid").alias("minRowId"))` `rank = df.join(df2, df.id == df2.id, how='full').drop(df2.id)` `rank = rank.withColumn("diff", fn.when(fn.col("k1")==fn.col("k2"), rank.rowid - rank.minRowId + 1))` – user3192082 May 22 '19 at 16:51
  • 2
    `udf` can be slow and `join` will require a more memory space. It depends on what you prefer. – mayank agrawal May 23 '19 at 07:41
1

You can do this using the API functions with a groupBy on id and k1, which should be faster than using a udf:

import pyspark.sql.functions as f

df.groupBy("id", "k1")\
    .agg(
        f.min(f.when(f.col("k1")==f.col("k2"), f.col("rowid"))).alias("first_equal"),
        f.min("rowid").alias("first_row")
    )\
    .select("id", "k1", (f.col("first_equal")-f.col("first_row")+1).alias("rank"))\
    .fillna(0)\
    .show()
#+---+---+----+
#| id| k1|rank|
#+---+---+----+
#|  1| v1|   2|
#|  2| v2|   1|
#|  3| v3|   0|
#+---+---+----+

The computation of rank can be broken down into two aggregation steps:

  • The first aggregation takes the min rowid for which k1==k2 for each id, k1 pair.
  • The second aggregation takes the min rowid over each id, k1 pair.

You take the difference of these (+1 as per your requirements) and finally fill any null values with 0.


Update: An alternative way using row_number:

from pyspark.sql import Window

# you can define your own order by column
w = Window.partitionBy("id", "k1").orderBy("rowid")

df.withColumn("rank", f.when(f.expr("k1 = k2"), f.row_number().over(w)))\
    .groupBy("id", "k1")\
    .agg(f.min("rank"))\
    .fillna(0)\
    .show()
# Same as above
pault
  • 41,343
  • 15
  • 107
  • 149
  • Beautiful!! Can you think of a way if the `rowid` column is unavailable? I added this intentionally as I though it would make the problem easies. – user3192082 May 22 '19 at 16:53
  • @user3192082 is [this](https://stackoverflow.com/questions/52318016/pyspark-add-sequential-and-deterministic-index-to-dataframe) what you're looking for? I'm not sure I understand. – pault May 22 '19 at 16:56
  • I have already used [this](https://stackoverflow.com/questions/39057766/spark-equivelant-of-zipwithindex-in-dataframe) to assign a sequential index to each row. What I meant was if I want to avoid this (i.e. the dataframe does not have the rowid column), can we still solve this and compute the rank? – user3192082 May 22 '19 at 17:04
  • @user3192082 yes, if you had a way to order the rows within each group. I don't see an obvious way based on the data you've provided. For example, for `id=1`, Why does `l1` sort before `v1` which sorts before `l2`? If there is a way to sort these, you can pobably use `pyspark.sql.functions.rank` or `pyspark.sql.functions.row_number`. – pault May 22 '19 at 19:13
  • 1
    @user3192082 I have added an edit to show you an example of how you could do it. You'd have to define the window function to order the rows in the appropriate way. Here I am using `rowid`, but you can use whatever method you like. (Remember that spark dataframes are inherently unordered so there's no easy way to rely on the order in which data appears in your source system). – pault May 22 '19 at 19:31