7

I would like to convert a DStream into an array, list, etc. so I can then translate it to json and serve it on an endpoint. I'm using apache spark, injecting twitter data. How do I preform this operation on the Dstream statuses? I can't seem to get anything to work other than print().

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
  def main(args: Array[String]) {

    // Location of the Spark directory 
    val sparkHome = "/opt/spark"

    // URL of the Spark cluster
    val sparkUrl = "local[8]"

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

    // HDFS directory for checkpointing
    val checkpointDir = "/tmp" 

    // Configure Twitter credentials using twitter.txt
    TutorialHelper.configureTwitterCredentials()

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))

    val filters = Array("#americasgottalent", "iamawesome")
    val tweets = TwitterUtils.createStream(ssc, None, filters)

    val statuses = tweets.map(status => status.getText())

    val arry = Array("firstval")
    statuses.foreachRDD {
         arr :+ _.collect()
    }

    ssc.checkpoint(checkpointDir)

    ssc.start()
    ssc.awaitTermination()
  }
}
CodingIsAwesome
  • 1,946
  • 7
  • 36
  • 54

2 Answers2

10

If your RDD is statuses you can do.

val arr = new ArrayBuffer[String]();
statuses.foreachRDD {
    arr ++= _.collect() //you can now put it in an array or d w/e you want with it
    ...
}

Keep in mind this could end up being way more data than you want in your driver since a DStream can be huge.

resueman
  • 10,572
  • 10
  • 31
  • 45
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • If I remove statuses.print() and add statuses.foreachRDD{val arr = _.collect()} I get an error: missing parameter type for expanded function ((x$1) => x$1.collect()) – CodingIsAwesome Jul 16 '14 at 20:00
  • @CodingIsAwesome scala's type system is a little weird sometimes, your RDDs have strings right? – aaronman Jul 16 '14 at 20:13
  • @CodingIsAwesome I think the update should work if not, just add the type info like in a normal scala lambda `foreachRDD((x:RDD[String])=> arr ++ x.collect())` – aaronman Jul 16 '14 at 20:16
  • Using your help and a little bit of google I think the edit above works, yet how to do I "prove" it? How can I print out the array during execution? – CodingIsAwesome Jul 16 '14 at 21:09
  • @CodingIsAwesome after `collect()` is called the data is in the driver you can do w/e you want with it – aaronman Jul 16 '14 at 21:16
5

Turns our you were close, but what I ended up looking for is.

statuses.foreachRDD( rdd => {
    for(item <- rdd.collect().toArray) {
        println(item);
    }
})  
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
CodingIsAwesome
  • 1,946
  • 7
  • 36
  • 54
  • not to belabor this but my answer is nearly identical and definitely gets the RDD's contents to the driver, perhaps you were using array buffer wrong / printing out the array wrong, keep in mind `println(arr)` does not do what you expect – aaronman Jul 19 '14 at 01:04