16

I wrote this program in spark shell

val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))

this prints some debug statements but not the actual numbers.

The code below works fine

for(num <- array.take(4)) {
  println(num)
}

I do understand that take is an action and therefore will cause spark to trigger the lazy computation.

But foreach should have worked the same way... why did foreach not bring anything back from spark and start doing the actual processing (get out of lazy mode)

How can I make the foreach on the rdd work?

Knows Not Much
  • 30,395
  • 60
  • 197
  • 373

2 Answers2

48

The RDD.foreach method in Spark runs on the cluster so each worker which contains these records is running the operations in foreach. I.e. your code is running, but they are printing out on the Spark workers stdout, not in the driver/your shell session. If you look at the output (stdout) for your Spark workers, you will see these printed to the console.

You can view the stdout on the workers by going to the web gui running for each running executor. An example URL is http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout

Spark Executor Stdout

In this example Spark chooses to put all the records of the RDD in the same partition.

This makes sense if you think about it - look at the function signature for foreach - it doesn't return anything.

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit

This is really the purpose of foreach in scala - its used to side effect.

When you collect records, you bring them back into the driver so logically collect/take operations are just running on a Scala collection within the Spark driver - you can see the log output as the spark driver/spark shell is whats printing to stdout in your session.

A use case of foreach may not seem immediately apparent, an example - if for each record in the RDD you wanted to do some external behaviour, like call a REST api, you could do this in the foreach, then each Spark worker would submit a call to the API server with the value. If foreach did bring back records, you could easily blow out the memory in the driver/shell process. This way you avoid these issues and can do side-effects on all the items in an RDD over the cluster.

If you want to see whats in an RDD I use;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
NightWolf
  • 7,694
  • 9
  • 74
  • 121
  • 1
    Foreach is great when you need to update an accumulator inside a function and want the action guarantee that it will only update once. As far as I can tell it's the only action in Spark that allows me to cleanly run a function on the RDD. ([Per the Spark Docs](http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka) For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value) – JimLohse Mar 08 '16 at 22:56
  • So, will foreach actually execute in the worker nodes? Running in the worker nodes is the intention in spark right. I've seen places where people have mentioned, "Don't use RDD.foreach() in a clustered environment". Is that true? – Hari Ram May 13 '18 at 12:44
  • @HariRam I'm not sure of the context of "Don't use RDD.foreach() in a clustered environment". If the operation is expensive you might want to look at using `foreachpartition`, which runs on each partition and might be a lot more efficient (e.g. batching records if inserting into a database). A large RDD in a clustered environment you need to think of the cost of the function inside `foreach` as it runs the function for each element in the RDD, if doing API calls, logging, DB operations etc you could have many worker nodes overload that resource (a DDoS style flood, fill up log disk space etc) – NightWolf Aug 25 '20 at 03:59
5

You can use RDD.toLocalIterator() to bring the data to the driver (one RDD partition at a time):

val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) { println(rec) }

See also

Community
  • 1
  • 1
Mark Rajcok
  • 362,217
  • 114
  • 495
  • 492