9

I need to add an index column to a dataframe with three very simple constraints:

  • start from 0

  • be sequential

  • be deterministic

I'm sure I'm missing something obvious because the examples I'm finding look very convoluted for such a simple task, or use non-sequential, non deterministic increasingly monotonic id's. I don't want to zip with index and then have to separate the previously separated columns that are now in a single column because my dataframes are in the terabytes and it just seems unnecessary. I don't need to partition by anything, nor order by anything, and the examples I'm finding do this (using window functions and row_number). All I need is a simple 0 to df.count sequence of integers. What am I missing here?

1, 2, 3, 4, 5

xv70
  • 922
  • 1
  • 12
  • 27
  • 1
    DataFrames are inherently unordered. This is one of the core reasons they work for parallel processing- any executor can pick up any part of the data and do its work. You *can* introduce an order (as you've shown), but how can it be deterministic if you don't order by anything? – pault Sep 13 '18 at 17:01
  • 1
    Btw, I believe `monotonically_increasing_id` will be deterministic as long as you don't change the number of partitions. – pault Sep 13 '18 at 17:03
  • 1
    Fair enough, maybe I'm using the word index out of context here. What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? – xv70 Sep 13 '18 at 17:05

2 Answers2

19

What I mean is: how can I add a column with an ordered, monotonically increasing by 1 sequence 0:df.count? (from comments)

You can use row_number() here, but for that you'd need to specify an orderBy(). Since you don't have an ordering column, just use monotonically_increasing_id().

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))-1
)

Also, row_number() starts at 1, so you'd have to subtract 1 to have it start from 0. The last value will be df.count - 1.


I don't want to zip with index and then have to separate the previously separated columns that are now in a single column

You can use zipWithIndex if you follow it with a call to map, to avoid having all of the separated columns turn into a single column:

cols = df.columns
df = df.rdd.zipWithIndex().map(lambda row: (row[1],) + tuple(row[0])).toDF(["index"] + cols
pault
  • 41,343
  • 15
  • 107
  • 149
0

Not sure about the performance but here is a trick.

Note - toPandas will collect all the data to driver

from pyspark.sql import SparkSession

# speed up toPandas using arrow
spark = SparkSession.builder.appName('seq-no') \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()

df = spark.createDataFrame([
    ('id1', "a"),
    ('id2', "b"),
    ('id2', "c"),
], ["ID", "Text"])

df1 = spark.createDataFrame(df.toPandas().reset_index()).withColumnRenamed("index","seq_no")

df1.show()

+------+---+----+
|seq_no| ID|Text|
+------+---+----+
|     0|id1|   a|
|     1|id2|   b|
|     2|id2|   c|
+------+---+----+
Sarde
  • 658
  • 1
  • 8
  • 19
  • 3
    `df.toPandas()` are you kidding, if to_pandas was a possibility, spark was never used in the first place! – Epsi95 Mar 09 '22 at 12:05
  • I have already given note and in some cases when we don't have a choice thats why df.toPandas methods are given – Sarde Dec 11 '22 at 02:26