6

How can I convert RDD to DataFrame in Spark Streaming, not just Spark?

I saw this example, but it requires SparkContext.

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

In my case I have StreamingContext. Should I then create SparkContext inside foreach? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");, which is not possible for RDD without converting it to DataFrame (as I know).

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}
Lobsterrrr
  • 325
  • 1
  • 5
  • 15
  • check this link https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/08%20Write%20Output%20To%20S3.html – Shankar Oct 12 '16 at 10:55
  • @Shankar: where does he define AWS access key? – Lobsterrrr Oct 12 '16 at 11:24
  • whatever written inside `foreachRDD` is executed at the Driver, so you can create `sqlContext` and convert the `rdd` to `DF` and then write to `S3`. – Shankar Oct 12 '16 at 11:37
  • @Shankar: I still misunderstand: should I create both StreamingContext and SparkContext outside of `foreachRDD`? In the example that you posted I cannot find where `sqlContext` is defined. I try to reproduce this example and it gives me an error that `sqlContext` cannot be found. I do not want to overcomplicate things, that's why I ask about the simplest possible solution. – Lobsterrrr Oct 12 '16 at 11:42

2 Answers2

2

Create sqlContext outside foreachRDD ,Once you convert the rdd to DF using sqlContext, you can write into S3.

For example:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

Update:

Even you can create sqlContext inside foreachRDD which is going to execute on Driver.

Shankar
  • 8,529
  • 26
  • 90
  • 159
  • I tested this example. It says `Cannot resolve symbol` refering to `saveAsTextFile`. I use scala 2.11 and spark 1.6.2. – Lobsterrrr Oct 12 '16 at 12:01
  • 1
    try `text` http://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter – Rockie Yang Oct 12 '16 at 12:06
  • Another issue is that I get an error about multiple SparkContext's. I assume that's because I have both SparkContext and StreamingContext: `val ssc = new StreamingContext(conf, Seconds(refreshingIntervalSeconds.toInt)) val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", Utils.getAWS_ACCESS_KEY()) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", Utils.getAWS_SECRET_KEY()) val sqlContext = new SQLContext(sc)` – Lobsterrrr Oct 12 '16 at 12:17
  • 2
    try `val ssc = new StreamingContext(sc, Seconds(refreshingIntervalSeconds.toInt))` – Rockie Yang Oct 12 '16 at 12:28
0

Look at the following answer which contains a scala magic cell inside a python notebook: How to convert Spark Streaming data into Spark DataFrame

Mor Shemesh
  • 2,689
  • 1
  • 24
  • 36