3

I want to create new dataset based on original dataset for example

for example my input1

my output should be2 I refer other code and got thiss

def duplicate_function(row):
    data = []  # list of rows to return
    to_duplicate = float(row["No_of_Occ"])

    i = 0
    while i < to_duplicate:
        row_dict = row.asDict()  # convert a Spark Row object to a Python dictionary
        row_dict["No_of_Occ"] = str(i)
        new_row = Row(**row_dict)  # create a Spark Row object based on a Python dictionary
        to_return.append(new_row)  # adds this Row to the list
        i += 1

    return data  # returns the final list

but how can I get the No_of_occ here?

Mykola Zotko
  • 15,583
  • 3
  • 71
  • 73
Ha Hoang
  • 31
  • 3
  • `no_of_occ == 0` gives you one line ? are you sure of this result ? Please provide text based code, not images. – Steven Dec 09 '19 at 14:26
  • Does this answer your question? [Pyspark: how to duplicate a row n time in dataframe?](https://stackoverflow.com/questions/50624745/pyspark-how-to-duplicate-a-row-n-time-in-dataframe) – dsalaj Feb 14 '22 at 10:46

1 Answers1

1

The general idea is to duplicate in-line the values as many times as No_of_Occ and then use a posexplode to generate more lines.

Assuming df is your dataframe.

from pyspark.sql import functions as F, types as T

output_schema = T.ArrayType(df.drop("no_of_occ").schema)

@F.udf(output_schema)
def duplicate(no_of_occ, *args):
    return list((args,) * no_of_occ)

df.select(
    "no_of_occ",
    F.posexplode(duplicate(*df.columns))
).select(
    "no_of_occ",
    (F.col("pos")+1).alias("occ_no"),
    F.col("col.*")
).show()


+---------+------+------+------+
|no_of_occ|occ_no|value1|value2|
+---------+------+------+------+
|        2|     1|     2|     3|
|        2|     2|     2|     3|
|        3|     1|     3|     4|
|        3|     2|     3|     4|
|        3|     3|     3|     4|
|        4|     1|     5|     6|
|        4|     2|     5|     6|
|        4|     3|     5|     6|
|        4|     4|     5|     6|
|        2|     1|     7|     8|
|        2|     2|     7|     8|
|        1|     1|     8|     9|
+---------+------+------+------+

need to solve the case where no_of_occ = 0.


EDIT: If you need to keep also the lines with 0 (as 1), then replace the UDF :

@F.udf(output_schema)
def duplicate(no_of_occ, *args):
    dup_value = no_of_occ or 1
    return list((args,) * dup_value)
Steven
  • 14,048
  • 6
  • 38
  • 73
  • It is better to not use the udf, and instead make use of [array_repeat](https://spark.apache.org/docs/2.4.0/api/sql/index.html#array_repeat). See the details in the [answer here](https://stackoverflow.com/questions/50624745/pyspark-how-to-duplicate-a-row-n-time-in-dataframe) – dsalaj Feb 14 '22 at 10:48
  • @dsalaj for spark 2.4 or newer, yes. otherwise, you're stuck with UDF. – Steven Feb 14 '22 at 11:01