1

This is pretty strange to me. I am familiar with the differences between map and foreach in Scala and the use cases for both but perhaps I don't understand something else. I first ran into this when I was playing around with Spark so it's possibly this only manifests itself when I am using an RDD.

Here is the following code in which the call to map is seemingly ignored. I am using Scala 2.11.1 and here is my dependencies for running the following code.

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)

The below can be pasted in a scala console

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.RangePartitioner

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test")
val sc: SparkContext = new SparkContext(conf)


val rdd: RDD[Tuple2[String, String]] = sc.parallelize(List(
  ("I", "India"),
  ("U", "USA"),
  ("W", "West")))

val rp = new RangePartitioner(3, rdd)
val parts = rdd.partitionBy(rp).cache()

parts.mapPartitionsWithIndex( (x,y) => { y.map(println); y } ).collect()

When running this you can see that nothing is printed to stdout. However if you change the last line of code to parts.mapPartitionsWithIndex( (x,y) => { y.map(println) } ).collect() or even this parts.mapPartitionsWithIndex( (x,y) => { y.foreach(println); y } ).collect() it will be printed.

I believe this is different then the question about stdout not being output since I am in local mode and this is an issue with evaluation of the RDD not stdout.

michael.schuett
  • 4,248
  • 4
  • 28
  • 39
  • In what case exactly it does not gets printed ? – Shivansh Nov 28 '17 at 03:05
  • In the full code example with the imports nothing is printed to stdout. Possibly it's a setup issue though if you are able to see something in stdout for every example. – michael.schuett Nov 28 '17 at 03:08
  • Are you running this on local ? And plz specify the case in which it does not run with the exact code difference – Shivansh Nov 28 '17 at 03:49
  • Possible duplicate of [Spark losing println() on stdout](https://stackoverflow.com/questions/33225994/spark-losing-println-on-stdout) – ayplam Nov 28 '17 at 03:54
  • `RDD` are lazy data structures. So no actual computation will happen untill you "consume" from a `RDD`. Here your consumption by `collect()` forces the `RDD` to really evaluate itself. – sarveshseri Nov 28 '17 at 06:26

1 Answers1

3

The type of the function passed into mapPartitionsWithIndex is (Int, Iterator[T]) ⇒ Iterator[U] so in your example, y is an Iterator

In Scala Iterator is a lazy collection, in other words it doesn't do any work until a result is needed (similar to an RDD).

In your code (x,y) => { y.map(println); y } The map transformation returns a new Iterator where each element, when evaluated, will be printed out and replaced with Unit. You then discard this iterator and return the original, making the output RDD identical to the input. Since the iterator produced by map is never used, the function is never evaluated and nothing is printed.

In your "working" codes, (x,y) => { y.map(println)} applies the same mapping transformation, but returns the iterator it produces. So when you evaluate the RDD you create from the transformation (using collect), the lines are printed (also notice that the resulting RDD will contain all Unit values).

In the other example, (x,y) => { y.foreach(println); y } You instead use foreach which evaluates each element of the iterator strictly, and returns Unit. You then return the input iterator (which is now empty, as you've already consumed all of the elements). When the RDD is evaluated with collect, this block is run and the elements are printed, notice how the resulting RDD is empty.

puhlen
  • 8,400
  • 1
  • 16
  • 31
  • Sorry, but briefly that because first case doesn't contain `action` and second does ??? – Thang Nguyen Nov 28 '17 at 04:37
  • @cue action is a spark specific term and here we are talking about `Iterator`, so plain old scala. It is basically the same idea though. `map` on an iterator does nothing immediately, but returns a new iterator where each element will have the function passed into map applied to it *when evaluated*. If you never do anything with the iterator that gets returned the code in the `map` never runs. – puhlen Nov 28 '17 at 04:49
  • Thanks for explaining this! I understood the lazy evaluation of spark to some extent but didn't realize it also applied to children RDDs which makes sense. Mostly I wasn't thinking that the partition was also returning an RDDs... which of course after reading your answer makes sense as well. – michael.schuett Nov 28 '17 at 14:56
  • 1
    @mschuett I need to reiterate, this specific problem has nothing to do with spark or RDDs, Iterator is a standard Scala class, it just also has similarly lazy semantics – puhlen Nov 28 '17 at 15:24