2

Source S3 location with 100s of JSON

  1. All JSON files needs to be combined to single JSON file. i.e. non part-0000... files
  2. The output single JSON file need to replace all these file on source S3 location
  3. Same JSON file needs to converted Parquet and saved to other S3 location

Is there any best option apart from below,

  1. Load the JSON file into Dataframe
  2. Save it on local disk
  3. Upload the combined JSON file to S3
  4. Clean rest of the S3 files after the combined S3 file is uploaded successfully using AWS SDK Client API
  5. This run in parallel to 4. save the parquet file to parquet S3 location via dataframes API

I had below question on above design

  • Is there any more robust way of doing it ?
  • Can I read from and write to same S3 location and skip step no. 2.
QuickSilver
  • 3,915
  • 2
  • 13
  • 29
  • Are the files all in a single directory, day partitions, something else? – Andrew May 01 '20 at 17:08
  • all json files are in single directory – QuickSilver May 01 '20 at 17:31
  • check the answer hope that helps. saving intermediate thing to local disk is no more needed unless its audit requirement. – Ram Ghadiyaram May 01 '20 at 17:54
  • You can if you use s3a file system access. https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html – Nikhil May 01 '20 at 18:03
  • sure @RamGhadiyaram – QuickSilver May 01 '20 at 18:25
  • @Nikhil Already doing it but how does it changes any of the steps which I have mentioned ? – QuickSilver May 01 '20 at 18:26
  • @QuickSilver Not sure which set of steps you are talking about. I thought it was the second set where you mention about saving the consolidated JSON file to the local disk. Instead, you can read all JSON files > union > (save to JSON with coalescing into a single file & save to Parquet) – Nikhil May 01 '20 at 19:19

2 Answers2

1

yes its possible to skip #2. Writing in to same location can be done with SaveMode.Overwrite the same location where you read from.

when you first read the json i.e. #1 as dataframe it will be in memory if you do cache. after that you can do a clean up and combile all json in to one with union and store in parquet file in a single step. something like this example.
Case 1: all jsons are in different folders and you want them to store final dataframe as parquet in the same location where jsons are there...

val dfpath1 = spark.read.json("path1")
val dfpath2 =  spark.read.json("path2")
val dfpath3 =  spark.read.json("path3")

val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe

val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..

 
  finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")

Case 2: all jsons are in same folders and you want them to store final dataframe as multiple parquet in the same root location where jsons are there...

In this case no need for reading as multiple dataframes you can give root path where jsons are there with same schema

val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")

// or you can give multiple paths spark.read.json("path1","path2","path3")
 // since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns  dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")

AFAIK, In either case aws s3 sdk api is no more required.

UPDATE : Reg. File Not Found Exception you are facing... see below code example of how to do it. I quoted the same example you showed me here

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("quick silver saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

Result :

+---+----+
|sex|date|
+---+----+
|1  |10  |
|2  |20  |
|3  |30  |
+---+----+

+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows

quick silver saved in same directory where he read it from final records he saved after clean up are  
+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
|3  |30  |Quick silver want to cleanup|
+---+----+----------------------------+


Screen shot of File saved and readback cleaned up and again saved :

enter image description here

Note : You need to implement the case 1 or case 2 like suggested update above...

Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • I have tried the above code in scala and it fails as it clear the destination directory before writing to it. Caused by: java.io.FileNotFoundException: No such file or directory: s3a:// – QuickSilver May 03 '20 at 14:20
  • aws s3 ls yourdir name ... its clearly filenot found – Ram Ghadiyaram May 03 '20 at 14:59
  • Please do not force me to accept a wrong answer. Please do some research and try the code by yourself before suggesting it. You can start to learn spark from this URL https://forums.databricks.com/questions/21830/spark-how-to-simultaneously-read-from-and-write-to.html – QuickSilver May 03 '20 at 16:26
  • First thing is you have not specified any examples orf location or dataframe its complete plain text.... [Try to improve your self in asking question by putting good examples and code snippet. you asked in genaral case I answered in the same way,](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples) – Ram Ghadiyaram May 03 '20 at 16:57
  • Due to sunday activities I could not able to turn back... .can you have quick look at the above. [I gave answer to databricks forums also](https://forums.databricks.com/answers/38733/view.html) – Ram Ghadiyaram May 04 '20 at 04:42
  • any other things ? – Ram Ghadiyaram May 04 '20 at 14:48
0
spark.read
                  .json(sourcePath)
                  .coalesce(1)
                  .write
                  .mode(SaveMode.Overwrite)
                  .json(tempTarget1)

                val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)

                val deleted = fs
                  .delete(new Path(sourcePath + File.separator), true)
                logger.info(s"S3 folder path deleted=${deleted} sparkUuid=$sparkUuid path=${sourcePath}")

                val renamed = fs
                  .rename(new Path(tempTarget1),new Path(sourcePath))

Tried and failed,

  1. Dataframe caching/persist did not work as whenever I tried to write the cachedDf.write went back to check the S3 file which were manually cleaned by me before write.
  2. Writing Dataframe directly to same S3 directory does not work as Dataframe only overrides the file which are partitioned i.e. file starting with 'part-00...'.
QuickSilver
  • 3,915
  • 2
  • 13
  • 29