41

I'm looking for a way to split an RDD into two or more RDDs. The closest I've seen is Scala Spark: Split collection into several RDD? which is still a single RDD.

If you're familiar with SAS, something like this:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

which resulted in two distinct data sets. It would have to be immediately persisted to get the results I intend...

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
Carlos Bribiescas
  • 4,197
  • 9
  • 35
  • 66

4 Answers4

67

It is not possible to yield multiple RDDs from a single transformation*. If you want to split a RDD you have to apply a filter for each split condition. For example:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

If you have only a binary condition and computation is expensive you may prefer something like this:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

It means only a single predicate computation but requires additional pass over all data.

It is important to note that as long as an input RDD is properly cached and there no additional assumptions regarding data distribution there is no significant difference when it comes to time complexity between repeated filter and for-loop with nested if-else.

With N elements and M conditions number of operations you have to perform is clearly proportional to N times M. In case of for-loop it should be closer to (N + MN) / 2 and repeated filter is exactly NM but at the end of the day it is nothing else than O(NM). You can see my discussion** with Jason Lenderman to read about some pros-and-cons.

At the very high level you should consider two things:

  1. Spark transformations are lazy, until you execute an action your RDD is not materialized

    Why does it matter? Going back to my example:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    If later I decide that I need only rdd_odd then there is no reason to materialize rdd_even.

    If you take a look at your SAS example to compute work.split2 you need to materialize both input data and work.split1.

  2. RDDs provide a declarative API. When you use filter or map it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.

At the end of the day this case is not special enough to justify its own transformation.

This map with filter pattern is actually used in a core Spark. See my answer to How does Sparks RDD.randomSplit actually split the RDD and a relevant part of the randomSplit method.

If the only goal is to achieve a split on input it is possible to use partitionBy clause for DataFrameWriter which text output format:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* There are only 3 basic types of transformations in Spark:

  • RDD[T] => RDD[T]
  • RDD[T] => RDD[U]
  • (RDD[T], RDD[U]) => RDD[W]

where T, U, W can be either atomic types or products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper for more details.

** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** See also Scala Spark: Split collection into several RDD?

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Very useful :). I was wondering why there wasnt a partition method equivalent in spark. Any Ideas? – Rakshith Mar 22 '16 at 05:19
  • 1
    @Rakshith Simplicity. And since we look at the lineages one branch would be discarded anyway. – zero323 Apr 27 '16 at 08:59
  • There are ways to split RDDs without 'filter', see: http://stackoverflow.com/a/37956034/3669757 – eje Jun 21 '16 at 23:50
  • 1
    @eje Similar method has been suggest by [Jason Lenderman](http://stackoverflow.com/a/32817565/1560062) some time ago and is already linked in this answer. The problem I see is an assumption that data fits in the executor memory which cannot be made in general. – zero323 Jun 22 '16 at 00:04
  • @zero323, all partition data has to fit in executor memory, at least while it is being computed. Multiplexed RDDs are no different. Storage category can be specified to control whether it is cached, or spilled, etc, after computation. – eje Jun 22 '16 at 00:51
  • @eje As far as I know not really. But if you can prove I am wrong I'll be happy to see some evidence. AFAIK data is exposed as an Iterator can be processed as it comes. Unless you create local non-lazy collection there is no reason to load all data into memory. You can easily observe the difference by comparing performance and resource usage of wholeTextFiles (which loads data at once) and textFile. This basically a reason why groupByKey is not the best idea ever. Out of curiosity - have you tried to measure an impact on GC? – zero323 Jun 22 '16 at 01:19
  • @zero323, I think I see where you are going with that; transforms whose compute methods only modify a lineage iterator can evaluate lazily. In practice, any extra memory from the non-lazy collections has never been an issue for me, but if it ever became one it's always possible to fall back to the traditional approach and trade speed for memory. Regarding GC, I never saw it spiking in the spark web UI, but I never collected systematic data on GC in particular, only aggregate run times, which I plotted in the blog post. – eje Jun 22 '16 at 01:39
  • @eje It can be issue although it is sometimes hard to predict why. Thanks for the answers. – zero323 Jun 22 '16 at 01:48
  • @zero323 What is GC? garbage collection? And how does this fit in the context of discussions? Thanks! – yuqli Nov 19 '17 at 15:45
8

As other posters mentioned above, there is no single, native RDD transform that splits RDDs, but here are some "multiplex" operations that can efficiently emulate a wide variety of "splitting" on RDDs, without reading multiple times:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Some methods specific to random splitting:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Methods are available from open source silex project:

https://github.com/willb/silex

A blog post explaining how they work:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

As mentioned elsewhere, these methods do involve a trade-off of memory for speed, because they operate by computing entire partition results "eagerly" instead of "lazily." Therefore, it is possible for these methods to run into memory problems on large partitions, where more traditional lazy transforms will not.

eje
  • 945
  • 11
  • 22
  • 3
    It's worth re-stating part of a conversation on the other answer: multiplexing allows increased efficiency via single-pass computations, but it does so by storing results in "non-lazy" containers, and so (depending on what is being computed) there can be an increase in resident memory, compared to traditional multi-pass varations, where computations can be lazy. In other words, multiplexing purchases increased computational efficiency with increased memory use – eje Jun 22 '16 at 15:38
  • 2
    Wouldn't this comment be better as a part of the answer? – zero323 Jul 26 '16 at 09:54
4

One way is to use a custom partitioner to partition the data depending upon your filter condition. This can be achieved by extending Partitioner and implementing something similar to the RangePartitioner.

A map partitions can then be used to construct multiple RDDs from the partitioned RDD without reading all the data.

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

Just be aware that the number of partitions in the filtered RDDs will be the same as the number in the partitioned RDD so a coalesce should be used to reduce this down and remove the empty partitions.

zero323
  • 322,348
  • 103
  • 959
  • 935
Jem Tucker
  • 1,143
  • 16
  • 35
  • Kinda, it will run tasks for each partition every time mapPartitions is called, however the actual data within the partitions if only ever read once – Jem Tucker Oct 19 '15 at 14:22
  • ok, but if i persist it immediately I'll only ever touch every observation once and I'll have two RDD as distinct output, correct? – Carlos Bribiescas Oct 19 '15 at 14:39
  • Yes that will be the case. – Jem Tucker Oct 19 '15 at 15:15
  • @JemTucker You can use `mapPartitionsWithIndex` instead of accessing `TaskContext`. On a side note it is not true that every observation will be touched only once. Since it requires shuffling, which is bad by itself, at least a part of the data will be read, serialized, transfered, deserialized and optionally written. It means not only that data is accessed multiple times but also in much more expensive way. – zero323 Oct 19 '15 at 19:27
  • That does make sence, I have however achieved good performance when using this method to filter into large numbers of RDDs. I agree shuffling is expensive however often shuffles are forced in previous steps so custom partitioners can be used in these steps to effectively order your partitions, allowing a group of filters to be avoided. – Jem Tucker Oct 19 '15 at 19:42
  • If data is already partitioned then it could make sense. Otherwise you replace linear memory scan with network communication which is roughly two orders of magnitude slower. Not to mention you have to correct for skewed distributions which requires either a priori knowledge or sampling (histogram in `RangePartitioner`) -> another scan over data. Anyway... You could simplify your code to this: `mapPartitionsWithIndex((i, iter) => if (rangeOfPartitionsToKeep.contains(i)) iter else Iterator())` – zero323 Oct 19 '15 at 20:27
2

If you split an RDD using the randomSplit API call, you get back an array of RDDs.

If you want 5 RDDs returned, pass in 5 weight values.

e.g.

val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
Ewan Leith
  • 1,655
  • 11
  • 10