0

I need to add a "row number" to a dataframe, but this "row number" must restart for each new value in a column.

Let me show you an example:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

df = spark.createDataFrame([
    ('2018-01-01', 'John', 0),
    ('2018-01-01', 'Paul', 1),
    ('2018-01-08', 'Paul', 3),
    ('2018-01-08', 'Pete', 4),
    ('2018-01-08', 'John', 3),
    ('2018-01-15', 'Mary', 6),
    ('2018-01-15', 'Pete', 6),
    ('2018-01-15', 'John', 6),
    ('2018-01-15', 'Paul', 6),
], ['str_date', 'name', 'value'])

# Convert str_date to date:
df = df.withColumn('date', to_date(df['str_date'])) \
    .select(['date', 'name', 'value'])

# Sort by name and date
df.orderBy(['name', 'date']).show()

## +----------+----+-----+
## |      date|name|value|
## +----------+----+-----+
## |2018-01-01|John|    0|
## |2018-01-08|John|    3|
## |2018-01-15|John|    6|
## |2018-01-15|Mary|    6|
## |2018-01-01|Paul|    1|
## |2018-01-08|Paul|    3|
## |2018-01-15|Paul|    6|
## |2018-01-08|Pete|    4|
## |2018-01-15|Pete|    6|
## +----------+----+-----+

So, what I need is to add a new column with the number of the row for each name:

# Expected result
## +----------+----+-----+------+
## |      date|name|value|rowNum|
## +----------+----+-----+------+
## |2018-01-01|John|    0|     1| <- First row for 'John'
## |2018-01-08|John|    3|     2|
## |2018-01-15|John|    6|     3|
## |2018-01-15|Mary|    6|     1| <- First row for 'Mary'
## |2018-01-01|Paul|    1|     1| <- First row for 'Paul'
## |2018-01-08|Paul|    3|     2|
## |2018-01-15|Paul|    6|     3|
## |2018-01-08|Pete|    4|     1| <- First row for 'Pete'
## |2018-01-15|Pete|    6|     2|
## +----------+----+-----+------+

I've been trying with the Window function, but I'm stuck. Can you please help me?

Notes:

  • It is guaranteed that the rows will be sorted (and, if they are not sorted, they will be sorted as part of the work pipeline)
  • I'm using Spark 2.4.0
Barranka
  • 20,547
  • 13
  • 65
  • 83

2 Answers2

3

Use a ranking function like row_number to do this. If there can be ties for a name on a given date, use dense_rank instead.

from pyspark.sql import Window 
from pyspark.sql import functions as f
#Window definition
w = Window.partitionBy(df.name).orderBy(df.date)
res = df.withColumn('rnum',f.row_number().over(w))
res.show()
Vamsi Prabhala
  • 48,685
  • 4
  • 36
  • 58
  • Thanks a lot! I've been banging my head all morning with this. I ended putting all together: `df = df.withColumn('row_num', row_number().over(Window.partitionBy('name').orderBy('date')))` – Barranka Jan 29 '19 at 22:33
0

Vamsi's answer is correct. missed a () for row_number so ...

    w = Window.partitionBy(df.name).orderBy(df.date)
    res = df.withColumn('rnum',f.row_number().over(w))  # change after row_number
    res.show()