2

I'm reading data from Kafka using spark streaming and passing to py file for prediction. It returns predictions as well as the original data. It's saving the original data with its predictions to file however it is creating a single file for each RDD. I need a single file consisting of all the data collected till the I stop the program to be saved to a single file.

I have tried writeStream it does not create even a single file. I have tried to save it to parquet using append but it creates multiple files that is 1 for each RDD. I tried to write with append mode still multiple files as output. The below code creates a folder output.csv and enters all the files into it.

 def main(args: Array[String]): Unit = {
    val ss = SparkSession.builder()
      .appName("consumer")
      .master("local[*]")
      .getOrCreate()

    val scc = new StreamingContext(ss.sparkContext, Seconds(2))


    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer"-> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer"> 
"org.apache.kafka.common.serialization.StringDeserializer",
        "group.id"-> "group5" // clients can take
      )
mappedData.foreachRDD(
      x =>
    x.map(y =>       
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
          )
    )
scc.start()
scc.awaitTermination()

I need to get just 1 file with all the statements one by one collected while streaming.

Any help will be appreciated, thank you in anticipation.

Prayas Pagade
  • 67
  • 2
  • 8

2 Answers2

6

You cannot modify any file in hdfs once it has been written. If you wish to write the file in realtime(append the blocks of data from streaming job in the same file every 2 seconds), its simply isn't allowed as hdfs files are immutable. I suggest you try to write a read logic that reads from multiple files, if possible.

However, if you must read from a single file, I suggest either one of the two approaches, after you have written output to a single csv/parquet folder, with "Append" SaveMode(which will create part files for each block you write every 2 seconds).

  1. You can create a hive table on top of this folder read data from that table.
  2. You can write a simple logic in spark to read this folder with multiple files and write it to another hdfs location as a single file using reparation(1) or coalesce(1), and read the data from that location. See below:

    spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")
    
Ana
  • 841
  • 10
  • 28
  • #Ana Okay so you mean I can't append things to a single file from spark streaming app, I'll have to write another code to read from the files written by spark streaming in regular intervals and append to another file is it? – Prayas Pagade Aug 19 '19 at 11:16
  • 1
    Yes and no. Yes you need to write another code. No, this code too will not append data in any file as append is not possible in HDFS, instead, it will create one big new file by combining smaller files from your previous location. – Ana Aug 20 '19 at 07:17
  • Updating repartition to coalesce in example as Raja suggested, this shall perform a bit better when reducing number of partition. – Ana Aug 20 '19 at 07:25
2

repartition - its recommended to use repartition while increasing no of partitions, because it involve shuffling of all the data.

coalesce- it’s is recommended to use coalesce while reducing no of partitions. For example if you have 3 partitions and you want to reduce it to 2 partitions, Coalesce will move 3rd partition Data to partition 1 and 2. Partition 1 and 2 will remains in same Container.but repartition will shuffle data in all partitions so network usage between executor will be high and it impacts the performance.

Performance wise coalesce performance better than repartition while reducing no of partitions.

So while writing use option as coalesce. For Ex: df.write.coalesce

Raja Sabarish PV
  • 115
  • 1
  • 14