9

Is there anyway I can shuffle a column of an RDD or dataframe such that the entries in that column appear in random order? I'm not sure which APIs I could use to accomplish such a task.

Ben McCann
  • 18,548
  • 25
  • 83
  • 101

5 Answers5

7

What about selecting the column to shuffle, orderBy(rand) the column and zip it by index to the existing dataframe?

import org.apache.spark.sql.functions.rand

def addIndex(df: DataFrame) = spark.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

case class Entry(name: String, salary: Double)

val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)

val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
  .select(col("salary").as("salary_shuffled"))
  .orderBy(rand))

df.join(df_shuffled, Seq("_index"))
  .drop("_index")
  .show(false) 

+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max  |2001.21|3001.5         |
|Zhang|3111.32|3111.32        |
|Paul |3001.5 |2001.21        |
|Bob  |1919.21|1919.21        |
+-----+-------+---------------+
Sascha Vetter
  • 2,466
  • 1
  • 19
  • 36
  • 1
    Just want to note for those that might make my mistake. You can't use `monotonically_increasing_id` here instead of the custom `addIndex` because it will be per partition and thus reduce your dataset. :) – Alex Moore-Niemi Mar 10 '20 at 21:54
4

If you don't need a global shuffle across your data, you can shuffle within partitions using the mapPartitions method.

rdd.mapPartitions(Random.shuffle(_));

For a PairRDD (RDDs of type RDD[(K, V)]), if you are interested in shuffling the key-value mappings (mapping an arbitrary key to an arbitrary value):

pairRDD.mapPartitions(iterator => {
  val (keySequence, valueSequence) = iterator.toSeq.unzip
  val shuffledValueSequence = Random.shuffle(valueSequence)
  keySequence.zip(shuffledValueSequence).toIterator
}, true)

The boolean flag at the end denotes that partitioning is preserved (keys are not changed) for this operation so that downstream operations e.g. reduceByKey can be optimized (avoid shuffles).

Gio Borje
  • 20,314
  • 7
  • 36
  • 50
2

While one can not not just shuffle a single column directly - it is possible to permute the records in an RDD via RandomRDDs. https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/random/RandomRDDs.html

A potential approach to having only a single column permuted might be:

  • use mapPartitions to do some setup/teardown on each Worker task
  • suck all of the records into memory. i.e. iterator.toList. Make sure you have many (/small) partitions of data to avoid OOME
  • using the Row object rewrite all back out as original except for the given column
  • within the mapPartitions create an in-memory sorted list
  • for the desired column drop its values in a separate collection and randomly sample the collection for replacing each record's entry
  • return the result as list.toIterator from the mapPartitions
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
1

You can add one additional column random generated, and then sort the record based on this random generated column. By this way, you are randomly shuffle your destined column.

In this way, you do not need to have all data in memory, which can easily cause OOM. Spark will take care of sorting and memory limitation issue by spill to disk if necessary.

If you don't want the extra column, you can remove it after sorting.

zhang zhan
  • 1,596
  • 13
  • 10
  • 1
    this is going to shuffle the entire dataframe. the intent is to shuffle one column, leaving the rest in the order they're in. – justin cress Sep 03 '20 at 16:59
1

In case someone is looking for a PySpark equivalent of Sascha Vetter's post, you can find it below:

from pyspark.sql.functions import rand
from pyspark.sql import Row
from pyspark.sql.types import *

def add_index_to_row(row, index):
  print(index)
  row_dict = row.asDict()
  row_dict["index"] = index
  return Row(**row_dict)

def add_index_to_df(df):
  df_with_index = df.rdd.zipWithIndex().map(lambda x: add_index_to_row(x[0], x[1]))
  new_schema = StructType(df.schema.fields + [StructField("index", IntegerType(), True)])
  return spark.createDataFrame(df_with_index, new_schema)

def shuffle_single_column(df, column_name):
  df_cols = df.columns
  # select the desired column and shuffle it (i.e. order it by column with random numbers)
  shuffled_col = df.select(column_name).orderBy(F.rand())
  # add explicit index to the shuffled column
  shuffled_col_index = add_index_to_df(shuffled_col)
  # add explicit index to the original dataframe
  df_index = add_index_to_df(df)
  # drop the desired column from df, join it with the shuffled column on created index and finally drop the index column
  df_shuffled = df_index.drop(column_name).join(shuffled_col_index, "index").drop("index")
  # reorder columns so that the shuffled column comes back to its initial position instead of the last position
  df_shuffled = df_shuffled.select(df_cols)
  return df_shuffled

# initialize random array
z = np.random.randint(20, size=(10, 3)).tolist()
# create the pyspark dataframe
example_df = sc.parallelize(z).toDF(("a","b","c"))
# shuffle one column of the dataframe
example_df_shuffled = shuffle_single_column(df = example_df, column_name = "a")
jared3412341
  • 197
  • 3
  • 12