3

I am receiving the streaming data myDStream (DStream[String]) that I want to save in S3 (basically, for this question, it doesn't matter where exactly do I want to save the outputs, but I am mentioning it just in case).

The following code works well, but it saves folders with the names like jsonFile-19-45-46.json, and then inside the folders it saves files _SUCCESS and part-00000.

Is it possible to save each RDD[String] (these are JSON strings) data into the JSON files, not the folders? I thought that repartition(1) had to make this trick, but it didn't.

    myDStream.foreachRDD { rdd => 
       // datetimeString = ....
       rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
    }
duckertito
  • 3,365
  • 2
  • 18
  • 25
  • See [how to make saveAsTextFile NOT split output into multiple file?](http://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file) for why it works this way and what to do about it. – hoyland Nov 13 '16 at 18:59
  • @hoyland: I checked this thread and the recommendation is to use `coalesce(1,true)`. I tried this solution, but I still get folders with `SUCCESS` and `part00000` instead of json files. In my case I know that the size of each RDD will be relatively small. – duckertito Nov 13 '16 at 19:05

3 Answers3

2

AFAIK there is no option to save it as a file. Because it's a distributed processing framework and it's not a good practice write on single file rather than each partition writes it's own files in the specified path.

We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with part- file name prefix.

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • Yes, I understand. For my application I decided to use `rdd.collect.mkString("\n")` and then to upload the strings to S3 as files using Amazon SDK. It works for for my needs. – duckertito Nov 13 '16 at 19:22
1

As an alternative to rdd.collect.mkString("\n") you can use hadoop Filesystem library to cleanup output by moving part-00000 file into it's place. Below code works perfectly on local filesystem and HDFS, but I'm unable to test it with S3:

val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")

import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath  + "-tmp"), true)
Mariusz
  • 13,481
  • 3
  • 60
  • 64
0

For JAVA I implemented this one. Hope it helps:

    val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
    File dir = new File(System.getProperty("user.dir") + "/my.csv/");
    File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
    fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
    fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);
Anna Klein
  • 1,906
  • 4
  • 27
  • 56