1

I have a dataframe df in Spark which looks something like this:

val df  = (1 to 10).toList.toDF()

When I check the number of partitions, I see that I have 10 partitions:

df.rdd.getNumPartitions
res0: Int = 10

Now I generate an ID column:

val dfWithID = df.withColumn("id", monotonically_increasing_id())
dfWithID.show()

+-----+---+
|value| id|
+-----+---+
|    1|  0|
|    2|  1|
|    3|  2|
|    4|  3|
|    5|  4|
|    6|  5|
|    7|  6|
|    8|  7|
|    9|  8|
|   10|  9|
+-----+---+

So all the generated ids are consecutive though I have 10 partitions. Then I repartition the dataframe:

val dfp = df.repartition(10)
val dfpWithID = dfp.withColumn("id", monotonically_increasing_id())
dfpWithID.show()

+-----+-----------+
|value|         id|
+-----+-----------+
|   10|          0|
|    1| 8589934592|
|    7|17179869184|
|    5|25769803776|
|    4|42949672960|
|    9|42949672961|
|    2|51539607552|
|    8|60129542144|
|    6|68719476736|
|    3|77309411328|
+-----+-----------+

Now I get the ids which are not consecutive anymore. Based on Spark documentation, it should put the partition ID in the upper 31 bits, and in both cases I have 10 partitions. Why it only adds the partition ID after calling repartition() ?

rezab
  • 11
  • 1

1 Answers1

0

I assume this is because all your data in your initial dataframe resides in a single partition, the other 9 being empty.

To very this, use the answers given here: Apache Spark: Get number of records per partition

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Thanks for your answer, but I already tried this. It seems all the partitions have 1 record. – rezab Apr 21 '21 at 20:42