6

Assuming I am having the following dataframe:

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])

And i want to create the following dataframe:

[('a',0),('b',2),('c',1),('d',3),('e',0)]

What I do is to convert it to rdd and use zipWithIndex function and after join the results:

convertDF = (df.select('number')
              .distinct()
              .rdd
              .zipWithIndex()
              .map(lambda x:(x[0].number,x[1]))
              .toDF(['old','new']))


finalDF = (df
            .join(convertDF,df.number == convertDF.old)
            .select(df.letter,convertDF.new))

Is if there is something similar function as zipWIthIndex in dataframes? Is there another more efficient way to do this task?

Tagar
  • 13,911
  • 6
  • 95
  • 110
Mpizos Dimitris
  • 4,819
  • 12
  • 58
  • 100

1 Answers1

8

Please check https://issues.apache.org/jira/browse/SPARK-23074 for this direct functionality parity in dataframes .. upvote that jira if you're interested to see this at some point in Spark.

Here's a workaround though in PySpark:

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

That's also available in abalon package.

Tagar
  • 13,911
  • 6
  • 95
  • 110
  • For Python 3+, need a minor change in this to make it work, because of the way map handles tuples. Following will work as tuple gets passed as single argument, using [] notation to read elements of the tuple, new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0]))) – Paladin Jan 28 '20 at 06:06