-2

I am trying to write a helper function that takes a dataset of any typeDataset[_], and returns with one new column "partitionId" which is the id of the partition that single data unit belongs to.

For example, if I have a dataset below and by default it has two partitions.

+-----+------+
| colA|  colB|
+-----+------+
|   1 |     a|
|   2 |     b|
|   3 |     c|
+-----+------+

After the function, it should be the the result below, where the first two data units belong to the same partition and the third one belongs to another partition.

+-----+------+------------+
| colA|  colB| partitionId|
+-----+------+------------+
|   1 |     a|           1|
|   2 |     b|           1|
|   3 |     c|           2|
+-----+------+------------+

I tried with withColumn() and mapPartitions(), but none of them worked for me. For withColumn(), I couldn't get the the info of what partition the data unit belongs to, like withColumn("partitionId", {What should be here to add the partitionId?}) For mapPartitions(), I tried:

dataset
  .mapPartitions(iter => {
    val partitionId = UUID.randomUUID().toString
    iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
  })

But this only works for specific type like Dataset[MyDataType], not for Dataset[_]

How can I add a partitionId column for any dataset?

HayreddinLuo
  • 91
  • 1
  • 6
  • if you remove the groupby in the above [answer](https://stackoverflow.com/questions/46032320/apache-spark-get-number-of-records-per-partition) then you will print all the recurring partition with their number. – Ram Ghadiyaram Jan 16 '21 at 01:57

1 Answers1

6

Is there a reason you need the partition ID of each record? Either way, you can achieve it by:

import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)
Nir Hedvat
  • 870
  • 7
  • 7