1

I have this rdd containing tuples and collecting them will be give me a list.

[x1, x2, x3, x4, x5]

But I want multiple chunks of that list like [ [x1,x2,x3], [x4,x5] ] I can do this by first performing a collect on rdd then divide the resulting list into chunks.

But I want this without performing collect because collecting may raise heap space error and brings all the data to the driver which is inefficient.

user3190018
  • 890
  • 13
  • 26
Kiran Gali
  • 101
  • 6
  • An `RDD` is already in "chunks" and is distributed among the workers. What are you attempting to do with the chunks? Maybe you are looking for `.foreachPartition` or `.mapPartitions` to work with the `RDD` one "chunk" at a time? – Travis Hegner May 14 '19 at 13:34
  • Thanks for the reply. I have a function which takes list of tuples and performs some action. Currently when I'm doing map over rdd like rdd.map(x=>func(x)). A list containing only one element is being passed. I need to pass a list containing more elements, more the elements more efficient the function @TravisHegner – Kiran Gali May 14 '19 at 13:40
  • hi you want to process parts of rdd one at a time ? collect is not good idea in any case unless you know that it will be small data. – Ram Ghadiyaram May 14 '19 at 14:10

1 Answers1

-1

Question : is there any efficient way to chunk a RDD which is having a big list into several lists without performing collection

Instead of collecting and modifying big list into several lists, you can make big rdd in to multiple small RDDs for further processing...

collecting big RDD is not a good idea. but if you want to divide big rdd in to small i.e. Array[RDD] you can go with below approach was wrtten in scala you can translate in to python by seeing example here.

python docs here

you can go for randomsplits see docs here

you can see how it was implemented from code which is available in git :

/**
   * Randomly splits this RDD with the provided weights.
   *
   * @param weights weights for splits, will be normalized if they don't sum to 1
   * @param seed random seed
   *
   * @return split RDDs in an array
   */
  def randomSplit(
      weights: Array[Double],
      seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
    require(weights.forall(_ >= 0),
      s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
    require(weights.sum > 0,
      s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")

    withScope {
      val sum = weights.sum
      val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
      normalizedCumWeights.sliding(2).map { x =>
        randomSampleWithRange(x(0), x(1), seed)
      }.toArray
    }
  }

Scala Example (not comfortable with python :-)): for python see docs here


import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * Created by Ram Ghadiyaram
  */
object RDDRandomSplitExample {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("RDDRandomSplitExample")
      .getOrCreate()


    val y = spark.sparkContext.parallelize(1 to 100)
    // break/split big rdd in to small rdd

    val splits: Array[RDD[Int]] = y.randomSplit(Array(0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1))

    splits.foreach(x => println("number of records in each rdd " + x.count))
  }
}

Result :

number of records in each rdd 9
number of records in each rdd 9
number of records in each rdd 8
number of records in each rdd 7
number of records in each rdd 9
number of records in each rdd 17
number of records in each rdd 11
number of records in each rdd 9
number of records in each rdd 7
number of records in each rdd 6
number of records in each rdd 8

Conclusion : you can see almost equal number of elements in each RDD. you can process each rdd with out collecting original big rdd

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121