4

I have this function in the driver program which collects the result from rdds into an array and send it back. However, even though the RDDs (in the dstream) have data, the function is returning an empty array...What am I doing wrong?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    

    return summary.toArray
}
user2888475
  • 63
  • 1
  • 4

2 Answers2

1

So while the foreachRDD will do what you are looking to do, it is also non-blocking which means it won't wait until all of the stream is processed. Since you cal toArray on your buffer right after the call to foreachRDD, there won't have been any elements processed yet.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Holden
  • 7,392
  • 1
  • 27
  • 33
  • 3
    Rather than 'non blocking', the computation is lazy and scheduled to a later moment. As such, this answer is incorrect in terminology. – maasg Feb 27 '15 at 10:37
  • 1
    foreachrdd isn't lazy in the same way that transformations on DStreams or RDDs, it's an action rather than a transformation. – Holden Feb 28 '15 at 00:21
  • @user2888475 Yet nothing will happen until `streamingContext.start()` gets called and something will happen scheduled on each `streaming interval` period of time. Actions in Spark Streaming lead to scheduling in the same way that in Spark they lead to execution. i.e. a Dstream without actions will not do anything. – maasg Feb 28 '15 at 11:59
1

DStream.forEachRDD is an action on given DStream and will be scheduled for execution on each streaming batch interval. It's a declarative construction of the job to be executed later on.

Accumulating over the values in this way is not supported because while the Dstream.forEachRDD is just saying "do this on each iteration", the surrounding accumulation code is executed immediately, resulting in an empty array.

Depending of what happens to the summary data after it's calculated, there're few options on how to implement this:

  • If the data needs to be retrieved by another process, use a shared thread-safe structure. A priority queue is great for top-k uses.
  • If the data will be stored (fs, db), you can just write to the storage after applying the topSearches function to the dstream.
maasg
  • 37,100
  • 11
  • 88
  • 115