3

I am creating a spark scala code in which I am reading a continuous stream from MQTT server. I am running my job in yarn cluster mode. I want to save and append this stream in a single text file in HDFS.

I will be receiving stream of data after every 1 second. So I need this data to be appended in single text file in HDFS.

Can any one help.

Arpit
  • 31
  • 1
  • 1
  • 3
  • You cannot have multiple tasks writing to the same HDFS file at the same time. So why use Spark? – Samson Scharfrichter Feb 08 '17 at 13:32
  • possible duplicate of - http://stackoverflow.com/questions/6389594/is-it-possible-to-append-to-hdfs-file-from-multiple-clients-in-parallel may also be useful - http://stackoverflow.com/questions/22997137/append-data-to-existing-file-in-hdfs-java – pjames Feb 08 '17 at 15:59
  • For me data is over writing to latest stream . I am saving dstream like: val lines = MQTTUtils.createStream(ssc,brokeraddress,topic) lines.foreachRDD{rdd => rdd.saveAsTextFile("rddoutput")}. I am getting data after every .5 second. So i need to save all data. But "lines" is loading with latest Dstream – Arpit Feb 14 '17 at 11:56
  • @Arpit hi ,have done it.i am having similar kind of architecture can u help – andani Jun 22 '18 at 06:27
  • @Arpit Did you get it solved? – Amrutha Jaya Raj Jun 29 '18 at 05:49

2 Answers2

2

Use data frame and use mode Append This will append the data every time new record comes.

val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._

stream.map(_.value).foreachRDD(rdd => {
    rdd.foreach(println)
    if (!rdd.isEmpty()) {
        rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).save("C:/data/spark/")
        // rdd.saveAsTextFile("C:/data/spark/")
    }

})
andani
  • 414
  • 1
  • 9
  • 28
VIJ
  • 1,516
  • 1
  • 18
  • 34
1

@Amrutha J Raj

rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("C:/data/spark/")

This means, RDD will convert to DF and we have used coalesce(1) so it will have only one file if you wont use that then spark may generate multiple files so with this it will restrict to only one and our write mode is Append so it will be appending to the existing file and inn json format.

ochs.tobi
  • 3,214
  • 7
  • 31
  • 52
Murtaza Zaveri
  • 283
  • 3
  • 11