1

I have a dataframe that looks like the following:

ID    NumRecords
123   2
456   1
789   3

I want to create a new data frame that concatenates the two columns and duplicates the rows based on the value in NumRecords

So the output should be

ID_New  123-1
ID_New  123-2
ID_New  456-1
ID_New  789-1
ID_New  789-2
ID_New  789-3

I was looking into the "explode" function but it seemed to take only a constant based on the example I saw.

Dan
  • 57
  • 3
  • 11
  • Does this answer your question? [Pyspark: how to duplicate a row n time in dataframe?](https://stackoverflow.com/questions/50624745/pyspark-how-to-duplicate-a-row-n-time-in-dataframe) – dsalaj Feb 14 '22 at 09:50

2 Answers2

0

I had a similar issue, this code will duplicate the rows based on the value in the NumRecords column:

from pyspark.sql import Row


def duplicate_function(row):
    data = []  # list of rows to return
    to_duplicate = float(row["NumRecords"])

    i = 0
    while i < to_duplicate:
        row_dict = row.asDict()  # convert a Spark Row object to a Python dictionary
        row_dict["SERIAL_NO"] = str(i)
        new_row = Row(**row_dict)  # create a Spark Row object based on a Python dictionary
        to_return.append(new_row)  # adds this Row to the list
        i += 1

    return data  # returns the final list


# create final dataset based on value in NumRecords column
df_flatmap = df_input.rdd.flatMap(duplicate_function).toDF(df_input.schema)
Jess
  • 97
  • 1
  • 2
  • 12
-1

You can use udf

from pyspark.sql.functions import udf, explode, concat_ws
from pyspark.sql.types import *

range_ = udf(lambda x: [str(y) for y in range(1, x + 1)], ArrayType(StringType()))

df.withColumn("records", range_("NumRecords") \
  .withColumn("record", explode("records")) \
  .withColumn("ID_New", concat_ws("-", "id", "record"))
  • I got "tuple object is not callable" on df.withColumn("records", range_("NumRecords") – Dan Jan 05 '17 at 17:17