2

I want to add the unique row number to my dataframe in pyspark and dont want to use monotonicallyIncreasingId & partitionBy methods. I think that this question might be a duplicate of similar questions asked earlier, still looking for some advice whether I am doing it right way or not. following is snippet of my code: I have a csv file with below set of input records:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

and I have loaded this csv file into a dataframe.

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

when I included the int value to my list, I have lost the dataframe schema.

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

Am I doing it right way? or is there any better way to add or preserve the schema of dataframe in pyspark?

Is it feasible to use zipWithIndex method to add unique consecutive row number for large size dataframe also? Can we use this row_id to re-partition the dataframe to uniformly distribute the data across the partitions?

vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • 1
    What exactly do you mean by lost schema? Did it start as not integer columns but then moved to strings? Also why do you not want to use monotonically increasing ids? – Willy Oct 31 '18 at 13:33
  • Monotonically increasing is not adding consecutive increment I'd.. it's just adding random unique number to my dataframe.. and partitionby with window function is bringing n partition data into one partition – vikrant rana Oct 31 '18 at 14:01
  • Lost schema I meant that while converting rdd to dataframe.. I have to specify the column name as well. Is there any way to preserve the schema of rdd when converting it to rdd and then from rdd to dataframe. – vikrant rana Oct 31 '18 at 14:04
  • 1
    Do a `row_number` over a partition set and then orderBy your choice. Why do you want to use RDD? After all DataFrames/DataSets are broken down to RDD, does not mean you need to use it. Avoid RDDs at any cost. And, are you losing the order or schema? I am sure it is the order not schema. – pvy4917 Oct 31 '18 at 14:26
  • I think I can use partitionby clause with window function instead of only using order by.. this way data will not move to single partition.. I will give a try on this. – vikrant rana Oct 31 '18 at 15:47
  • Hi, you can check of all the ways to do this, along with the dangers [here](https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6), from this answer [here](https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram/48209785#48209785) – mkaran May 13 '21 at 08:02

2 Answers2

12

I have found a solution and it's very simple. since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.

Lets add a new column to the existing dataframe with some default value in it.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

and create a window function with paritionBy using that column "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

you will get the desired results:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
1

Using Spark SQL:

df = spark.sql("""
SELECT 
    row_number() OVER (
        PARTITION BY '' 
        ORDER BY '' 
    ) as id,
    *
FROM 
    VALUES 
    ('Bob  ', 20),
    ('Alice', 21),
    ('Gary ', 21),
    ('Kent ', 25),
    ('Gary ', 35)
""")

Output:

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)

>>> df.show()
+---+-----+----+
| id| col1|col2|
+---+-----+----+
|  1|Bob  |  20|
|  2|Alice|  21|
|  3|Gary |  21|
|  4|Kent |  25|
|  5|Gary |  35|
+---+-----+----+
Kent Pawar
  • 2,378
  • 2
  • 29
  • 42