0

I am trying to write data in single file using spark scala:

while (loop > 0) {
      val getReq = new HttpGet(ww.url.com)

      val httpResponse = client.execute(getReq)
      val data = Source.fromInputStream(httpResponse.getEntity.getContent()).getLines.mkString

      val parser = JSON.parseFull(data)
      val globalMap = parser.get.asInstanceOf[Map[String, Any]]
      val reviewMap = globalMap.get("payload").get.asInstanceOf[Map[String, Any]]

      val df = context.sparkContext.parallelize(Seq(reviewMap.get("records").get.toString())).toDF()
      if (startIndex == 0) {
        df.coalesce(1).write.mode(SaveMode.Overwrite).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
      } else {
        df.coalesce(1).write.mode(SaveMode.Append).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
        
      }

      startIndex = startIndex + limit
      loop = loop - 1
      httpResponse.close()
    }

The number of file created is the number of loops and I want to create one file only. and it is also creating CRC file as well I want to remove those: I tried below config but it only stops creation of Success files:

.config("dfs.client.read.shortcircuit.skip.checksum", "true")
      .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
      .config("fs.file.impl.disable.cache", true)

Any ideas to create one file only without crc and success files?

Kumar Harsh
  • 423
  • 5
  • 26
  • Does this answer your question? [Write single CSV file using spark-csv](https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) – Tomer Shetah Nov 02 '20 at 11:25

1 Answers1

1

Re: "The number of file created is the number of loops"

Even though you are using df.coalesce(1) in your code, it is still being executed as many number of times as you run the while loop.

I want to create one file only From your code it seems that you are trying to invoke HTTP GET requests to some URL and save the content after parsing. If this understanding is right then I believe you should not be using a while loop to do this task. There is map transformation that you could use in the following manner.

Please find below the Psuedo-Code for your reference.

    val urls = List("a.com","b.com","c.com")
    val sourcedf = sparkContext.parallelize(urls).toDF
    //this could be map or flatMap based on your requirement.
    val yourprocessedDF = sourcedf.map(<< do your parsing here and emit data>>)
    yourprocessedDF.repartition(1).write(<<whichever format you need>>)
Dharman
  • 30,962
  • 25
  • 85
  • 135
Amit
  • 1,111
  • 1
  • 8
  • 14