1
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));

My HDFS directory contains json files

Krishna
  • 1,089
  • 5
  • 24
  • 38

1 Answers1

2

You can use textFileStream to read it as a text file and convert it later.

val dstream = ssc.textFileStream("path to hdfs directory")

This gives you DStream[Strings] which is a collection of RDD[String]

Then you can get an RDD for each interval of time as

dstream.foreachRDD(rdd => {
  //now apply a transformation or anything with the each rdd
 spark.read.json(rdd) // to change it to dataframe
})

scc.start()             // Start the computation
ssc.awaitTermination()   // Wait for the computation to terminate

Hope this helps

koiralo
  • 22,594
  • 6
  • 51
  • 72