6

I am creating a dataframe with pyspark, like this:

+----+------+
|   k|     v|
+----+------+
|key1|value1|
|key1|value1|
|key1|value1|
|key2|value1|
|key2|value1|
|key2|value1|
+----+------+

I want to add one 'rowNum' column using 'withColumn' method, the result of dataframe changed like this:

+----+------+------+
|   k|     v|rowNum|
+----+------+------+
|key1|value1|     1|
|key1|value1|     2|
|key1|value1|     3|
|key2|value1|     4|
|key2|value1|     5|
|key2|value1|     6|
+----+------+------+

the range of rowNum is from 1 to n, n is equal to number of raws. I modified my code, like this:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy("v").orderBy('k')
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w))

But, I got error message:

'module' object has no attribute 'rowNumber' 

I replaced rowNumber() method with row_number, the above code can run. But, When I run code:

my_df.show()

I got error message again:

Py4JJavaError: An error occurred while calling o898.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)
Ivan Lee
  • 3,420
  • 4
  • 30
  • 45

4 Answers4

6

Solution in Spark 2.2:

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("rowNum", row_number().over(w))
cph_sto
  • 7,189
  • 12
  • 42
  • 78
1

If you require require a sequential rowNum value from 1 to n, rather than a monotonically_increasing_id you can use zipWithIndex()

Recreating your example data as follows:

rdd = sc.parallelize([('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1')])

You can then use zipWithIndex() to add an index to each row. The map is used to reformat the data and to add 1 to the index so it starts at 1.

rdd_indexed = rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))
df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()


+----+------+------+
|  id| score|rowNum|
+----+------+------+
|key1|value1|     1|
|key1|value1|     2|
|key1|value1|     3|
|key1|value1|     4|
|key1|value1|     5|
|key1|value1|     6|
+----+------+------+
Alex
  • 21,273
  • 10
  • 61
  • 73
  • I need to base on current dataframe to add this new column. So, I hope use the withColumn method of dataframe. – Ivan Lee Mar 10 '17 at 00:17
  • The `rdd` can be accessed by using `df.rdd` allowing you to use the same concept. Please note that I would recommend the used of is to use `monotonically_increasing_id` in combination with `withColumn` although this approach doesn't guarantee sequential ids. – Alex Mar 10 '17 at 15:16
1

You can do this with windows

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
your_df= your_df.withColumn("rowNum", rowNumber().over(w))

Here your_df is data frame in which you need this column.

Rakesh Kumar
  • 4,319
  • 2
  • 17
  • 30
  • 1
    I used your code to try in my program. I found some problems: 'module' object has no attribute 'rowNumber' . So, I found the other method row_number. The row_number can run. But, when I run the code: your_df.show(). I got error message. like this: Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number() – Ivan Lee Mar 09 '17 at 14:23
  • Did you imported rowNumber – Rakesh Kumar Mar 10 '17 at 03:11
  • And these statements are 100℅ accurate as I used in my production code – Rakesh Kumar Mar 10 '17 at 03:12
  • What version of Spark are you both using, @IvanLee and RakesKumar ? – Alex Mar 10 '17 at 20:29
  • 1
    I am using spark 2.1, I am trying to fix this problem. I think this is matter of different version. – Ivan Lee Mar 11 '17 at 01:34
  • 1
    I am using spark 1.6 – Rakesh Kumar Mar 11 '17 at 07:02
  • I am sure the reason of this problem is matter of different version. I am using spark 2.1. My code is correctly running with row_number method. – Ivan Lee Mar 12 '17 at 01:40
1

I have used spark2.2 and found "row_number()" working.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

win_row_number = Window.orderBy("col_name")
df_row_number = df.select("col_name", F.row_number().over(win_row_number))
girijesh96
  • 455
  • 1
  • 4
  • 16
Athar
  • 963
  • 10
  • 23