12

This is a minor issue with one of my spark jobs which doesn't seem to cause any issues -- yet annoys me every time I see it and fail to come up with a better solution.

Say I have a Scala collection like this:

val myStuff = List(Try(2/2), Try(2/0))

I can partition this list into successes and failures with partition:

val (successes, failures) =  myStuff.partition(_.isSuccess)

Which is nice. The implementation of partition only traverses the source collection once to build the two new collections. However, using Spark, the best equivalent I have been able to devise is this:

val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }

Which aside from the difference of unpacking the Try (which is fine) also requires traversing the data twice. Which is annoying.

Is there any better Spark alternative that can split an RDD without multiple traversals? i.e. having a signature something like this where partition has the behaviour of Scala collections partition rather than RDD partition:

val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

For reference, I previously used something like the below to solve this. The potentially failing operation is de-serializing some data from a binary format, and the failures have become interesting enough that they need to be processed and saved as an RDD rather than something logged.

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}
jdeastwood
  • 155
  • 5

1 Answers1

1

There might be other solutions, but here you go:

Setup:

import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)

Execution:

val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
  (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
  (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))

Let me know if you need an explanation

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • That is indeed roughly equivalent to List().partition; but that only works for small data sets, since it involves loading everything into a list rather than an RDD! – jdeastwood Mar 16 '15 at 13:10
  • @jdeastwood Before I go ahead and try something else, can you clarify your question then. This matches your need and examples. What is the final signature you are looking for. – Justin Pihony Mar 16 '15 at 14:28
  • Updated to try and make it clearer. What you presented here is close, but changes the type of collection. – jdeastwood Mar 16 '15 at 21:13
  • This is not possible: https://groups.google.com/forum/#!topic/spark-users/rkVPXAiCiBk Josh Rosen's links are good, especially this one https://groups.google.com/forum/#!msg/spark-users/KC1UJEmUeg8/N_qkTJ3nnxMJ as it is directly answered by Matei. Your best bet is to check the success in the main RDD. You can use mapPartitions to at least get all the data per node. – Justin Pihony Mar 17 '15 at 00:53
  • Nested RDDs arent quite the right way to think about it (I'm looking to split RDDs, not create an RDDs of RDDs). But nevertheless I think you are right that this isn't possible, without a custom extension to RDD at least. Thanks! – jdeastwood Mar 17 '15 at 15:32
  • @jdeastwood Yes, but to do that you would need to create an RDD inside the partitions and pass that back. The type signature does not allow it, but I thought maybe the combops in aggregate could return a new type, but that wouldnt be associative I dont think. Any solution I can think of would require you to create the RDD's within and merge them...which is nesting. Maybe you could figure it out with some sort of partitioner, but Im not sure – Justin Pihony Mar 17 '15 at 15:44
  • @jdeastwood Another person asked, and I figured out a way to accomplish this: http://stackoverflow.com/questions/29547185/apache-spark-rdd-filter-into-two-rdds/29548038#29548038 – Justin Pihony Apr 09 '15 at 20:11