29

In spark streaming, every batch interval of data always generate one and only one RDD, why do we use foreachRDD() to foreach RDD? RDD is only one, needn't foreach. In my testing, I never see RDD more than one.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
Guo
  • 1,761
  • 2
  • 22
  • 45

1 Answers1

58

A DStream or "discretized stream" is an abstraction that breaks a continuous stream of data into small chunks. This is called "microbatching". Each microbatch becomes an RDD that is given to Spark for further processing. There's one and only one RDD produced for each DStream at each batch interval.

An RDD is a distributed collection of data. Think of it as a set of pointers to where the actual data is in a cluster.

DStream.foreachRDD is an "output operator" in Spark Streaming. It allows you to access the underlying RDDs of the DStream to execute actions that do something practical with the data. For example, using foreachRDD you could write data to a database.

The little mind twist here is to understand that a DStream is a time-bound collection. Let me contrast this with a classical collection: Take a list of users and apply a foreach to it:

val userList: List[User] = ???
userList.foreach{user => doSomeSideEffect(user)}

This will apply the side-effecting function doSomeSideEffect to each element of the userList collection.

Now, let's say that we don't know all the users now, so we cannot build a list of them. Instead, we have a stream of users, like people arriving into a coffee shop during morning rush:

val userDStream: DStream[User] = ???
userDstream.foreachRDD{usersRDD => 
    usersRDD.foreach{user => serveCoffee(user)}
}

Note that:

  • the DStream.foreachRDD gives you an RDD[User], not a single user. Going back to our coffee example, that is the collection of users that arrived during some interval of time.
  • to access single elements of the collection, we need to further operate on the RDD. In this case, I'm using a rdd.foreach to serve coffee to each user.

To think about execution: We might have a cluster of baristas making coffee. Those are our executors. Spark Streaming takes care of making a small batch of users (or orders) and Spark will distribute the work across the baristas, so that we can parallelize the coffee making and speed up the coffee serving.

maasg
  • 37,100
  • 11
  • 88
  • 115
  • @guo see also my presentation on Spark Streaming. It has an animated explanation of how it works: https://www.youtube.com/watch?v=mgvYg-0OXkU – maasg Apr 06 '16 at 04:36
  • 1
    Thank you for the correction regarding partition/RDD. Wasn't aware of that. – Yuval Itzchakov Apr 06 '16 at 04:42
  • What does seem weird is that the abstraction provides an `Iterator[RDD[T]]`. Is there a case where more than one DStream will be available during such iteration? – Yuval Itzchakov Apr 06 '16 at 05:46
  • @YuvalItzchakov Where do you see that `DStream` provides an `Iterator[RDD[T]]`? What do you mean with 'more than one DStream'? RDD maybe? – maasg Apr 06 '16 at 06:12
  • 2
    @maasg I still don't understand! As you said, "**There's one and only one RDD** produced for each DStream at each batch interval", it needn't API of `foreachRDD()` to foreach RDD, because of only one RDD, `foreachRDD()` can named `getRDD()` and return RDD object. Your example can modify like this:    `val userDStream: DStream[User] = ???`     `userDstream.getRDD.foreach{user => serveCoffee(user)}` – Guo Apr 06 '16 at 06:31
  • @Guo, using the example of the coffee shop. Imagine we have DStream[User] at the following time intervals: 9:00 :(alice, bart, mike, gerard), 9:15: (bruno, guo, ana), 9:30: (eva, lisa). What should `userDstream.getRDD` return when you start the shop at 6:00 ? How can you process the users of 9:00 or 9:30? – maasg Apr 06 '16 at 06:54
  • 1
    @maasg your mean that use 15min as a time interval, 9:00 generate a RDD, 9:15 generate a RDD, 9:30 generate a RDD, you can use `foreachRDD()` to foreach three RDD(contains 9:00, 9:15, 9:30), but suppose time is 9:30 now, the RDDs of 9:00 and 9:15 was process before, if you use `foreachRDD()`, you only can process one RDD of 9:30, how do you process RDDs before 9:30 at 9:30? the data or RDDs before 9:30 is disappearing at 9:30, isn't it? In Spark steaming, every time interval process current batch interval of data, why do you process RDD of 6:00 at 9:30, RDD of 6:00 shoud be process at 6:00. – Guo Apr 06 '16 at 07:24
  • 1
    @Guo, no, that's not what I meant. I wanted to let you see that the foreachRDD it time-bound. You should understand `foreachRDD` as `scheduleOperationOnRddForEachTimeInterval(rdd => operation(rdd))`. – maasg Apr 06 '16 at 08:47
  • @Guo did you see the video above? – maasg Apr 06 '16 at 08:47
  • @maasg thank you for your answer! I go to see video first. – Guo Apr 06 '16 at 09:01
  • @maasg what if you didn't do .foreachRDD and just userDStream.map(), wouldn't that perform the operation on each user still? – cool breeze Dec 21 '16 at 17:32
  • 1
    @coolbreeze like in Spark, if there're no actions the transformations are not materialized. Further more, if we don't register an output operation (like `foreachRDD`) on the Streaming Context, it will throw an exception when we try to start it: `No output operations registered, so nothing to execute` – maasg Dec 21 '16 at 19:37
  • Has this changed in spark 2.4, that foreachrdd is called per batch interval? i have a print statement in `foreachRDD` but only prints once despite many batch intervals have passed. – Fermat's Little Student Jan 27 '20 at 14:30