1

I'm trying to create a new column in a data frame that is simply a shuffled version of an existing column. I'm able to randomly order the rows in a data frame using the method described in How to shuffle the rows in a Spark dataframe?, but when I try to add the shuffled version of the column to a data frame, it appears to not perform the shuffling.

import pyspark
import pyspark.sql.functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.range(5).toDF("x")
df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  1|
#> |  2|
#> |  3|
#> |  4|
#> +---+

# the rows appear to be shuffled
ordered_df = df.orderBy(F.rand())
ordered_df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  2|
#> |  3|
#> |  4|
#> |  1|
#> +---+

# ...but when i try to add this column to the df, they are no longer shuffled
df.withColumn('y', ordered_df.x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+

Created on 2019-06-28 by the reprexpy package

A few notes:

  • I would like to find a solution where the data remains in Spark. For example, I don't want to have to use a user-defined function that requires that the data be moved out of the JVM.
  • The solution in PySpark: Randomize rows in dataframe didn't work for me (see below).

df = spark.sparkContext.parallelize(range(5)).map(lambda x: (x, )).toDF(["x"])

df.withColumn('y', df.orderBy(F.rand()).x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+
  • I have to shuffle the rows in many columns, and each column has to be shuffled independently of the others. As such, I would prefer to not use the zipWithIndex() solution in https://stackoverflow.com/a/45889539, as that solution would require that I run many joins on the data (which i'm assuming will be time-intensive).
Chris
  • 1,575
  • 13
  • 20
  • 1
    Look at the execution plan: `df.withColumn('y', ordered_df.x).explain()`. Spark is lazy, so `ordered_df` is not cached or saved anywhere. When you call `withColumn`, it's calculated again. Shuffling the columns independently is just not something spark is designed to do well. Each row is processed as a atomic unit - this is what allows spark to parallelize row operations across executors. I don't think there's any way to avoid doing a join. – pault Jun 29 '19 at 03:58
  • Why would the fact that the ordering step (`df.orderBy(F.rand())`) has to be re-run when I call `df.withColumn('y', ordered_df.x).show()` result in the reordering not being done at all (or at least that's how it appears)? In other words, I don't understand why the fact that Spark is lazy would impact this case. – Chris Jun 29 '19 at 04:30
  • For example, if you I add a call to `ordered_df.cache()`, I still run into the problem that I described. – Chris Jun 29 '19 at 05:33
  • I think you have some misunderstandings about lazy execution. One example is to run `ordered_df.show()` multiple times - you'll see that you get a different result each time. Spark doesn't "save" any of the values - only the instructions on how to create those values. – pault Jun 29 '19 at 13:38
  • I know that Spark doesn't actually do the work until an action is called - My point is that lazy execution shouldn't be to blame for me not getting what I expect. For example, why doesn't `df.withColumn('y', df.orderBy(F.rand()).x).show()` show the column (`y`) as being reordered? An action is called (`show()`), so `y` should be randomly ordered, but it isn't. – Chris Jul 01 '19 at 13:04

1 Answers1

2

You can accomplish this using window functions to assign each row a random index, doing this again in a separate DF, and then joining on the index:

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> df = spark.range(5).toDF("x")
>>> left = df.withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> right = df.withColumnRenamed("x", "y").withColumn("rnd", F.row_number().over(Window.orderBy(F.rand()))) 
>>> dff = left.join(right, left.rnd == right.rnd).drop("rnd")
>>> dff.show()
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+---+                                                                       
|  x|  y|
+---+---+
|  3|  3|
|  2|  0|
|  0|  2|
|  1|  1|
|  4|  4|
+---+---+

As the warning suggests, this might not be a great idea in practice.

Charlie Flowers
  • 1,287
  • 7
  • 12