15

I have a spark streaming application that writes parquet data from stream.

sqlContext.sql(
      """
        |select
        |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
        |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
        |*
        |from events
        | where at >= 1473667200
      """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)

this piece of code runs every hour but over time the writing to parquet has slowed down. When we started it took 15 mins to write data, now it takes 40 mins. It is taking time propotional to data existing in that path. I tried running the same application to a new location and that runs fast.

I have disabled schemaMerge and summary metadata:

sparkConf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")

using spark 2.0

batch execution: empty directory enter image description here enter image description here enter image description here directory with 350 folders enter image description here enter image description here enter image description here

Gaurav Shah
  • 5,223
  • 7
  • 43
  • 71
  • the problem is because spark tries to list all leaf nodes and that part is very slow, it tries to do that twice and that adds up 13 * 2 mins extra. Although I am unable to figure out why the list file is slow – Gaurav Shah Sep 16 '16 at 16:25
  • did you manage to find any solution? I encountered thes same issue. In spark-submit stderr I can see that Spark opens for reading and also seeking the old parquet files in S3. I just don't understand how to avoid this. – Niros Nov 28 '16 at 15:00
  • I could not manage to avoid reading the older partitions, but I did improve the partition read speed 10 fold. there by reducing over all time from 10 min to 1 min. http://stackoverflow.com/questions/39513505/spark-lists-all-leaf-node-even-in-partitioned-data/39946236#39946236 – Gaurav Shah Nov 30 '16 at 02:18
  • What do you think about writing the new data into local hdfs and then copy it to s3? That way, the listing will be fast – Niros Nov 30 '16 at 08:11
  • might as well copy to a new location on s3 each time then invoke mv – Gaurav Shah Nov 30 '16 at 09:20
  • move on s3 is not efficient (performs copy and delete) – Niros Nov 30 '16 at 11:09
  • nope, with newer hadoop library it performs native mv, which is not copy delete – Gaurav Shah Nov 30 '16 at 15:52

3 Answers3

2

I've encountered this issue. The append mode is probably the culprit, in that finding the append location takes more and more time as the size of your parquet file grows.

One workaround I've found that solves this is to change the output path regularly. Merging and reordering the data from all the output dataframes is then usually not an issue.

def appendix: String = ((time.milliseconds - timeOrigin) / (3600 * 1000)).toString

df.write.mode(SaveMode.Append).format("parquet").save(s"${outputPath}-H$appendix")
Francois G
  • 11,957
  • 54
  • 59
  • 1
    I do not understand why would append cause such issue ? why would append make any difference , its not like it is writing to the same file – Gaurav Shah Sep 16 '16 at 07:30
  • 1
    When you use `Append`, data is first written to temporary files and then if/when that succeeds, the temporary files are copied into you specified outputPath. This causes saving in `Append` mode to run a lot slower. – Glennie Helles Sindholt Sep 16 '16 at 10:19
  • but that does not explain gradual growth, moving from temporary to correct directory is a fixed cost. So I should see the cost from 2nd iteration itself. Instead I see it gradually growing over time. – Gaurav Shah Sep 16 '16 at 10:28
  • Thanks for append answer and explanation, it makes logical sense, saved me ton of time :) – Vikas Jul 28 '22 at 07:46
1

Try to write the dataframe to EMR HDFS (hdfs://...) and then use s3-dist-cp to upload the data from HDFS to S3. Worked for me.

Niros
  • 632
  • 5
  • 18
  • you didn't answer the question tho. – linehrr Dec 11 '17 at 17:45
  • There is no question, OP just described a problem and I suggested a solution. Netflix did the same IIRC (i.e. first writing to HDFS, than copying to S3). – Niros Dec 18 '17 at 15:44
0

It might be due to append mode. In this mode new files should be generated with different names from already existing files, so spark lists files in s3(which is slow) every time.

We also set parquet.enable.summary-metadata a bit differently:

javaSparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");
Kishore
  • 5,761
  • 5
  • 28
  • 53
Igor Berman
  • 1,522
  • 10
  • 16
  • added dag for more details, but 10 mins( twice) for listing through 300 directories is still large – Gaurav Shah Sep 16 '16 at 10:27
  • @GauravShah, try to list same directories with aws s3 --recursive from same machine. The problem with DAG is that it's not necessary shows everything spark does(e.g. I know for 100% that when you have a lot of files that you append to s3 the commit stage takes a lot of time, it's single threaded from driver, but you don't see this in spark ui, there all tasks/jobs are done) – Igor Berman Sep 16 '16 at 10:40
  • @GauravShah, pay attention to my edit, parquet.enable.summary-metadata is hadoop property and not spark property – Igor Berman Sep 16 '16 at 10:50
  • its disabled by default in spark 2 , will try that though – Gaurav Shah Sep 16 '16 at 11:08
  • also it can list list files only from the directories that it is writing to instead of listing from all, if it was only going to check for existance of files – Gaurav Shah Sep 16 '16 at 11:30
  • tried listing --recursive list took 1m, so seems like the problem might not be in listing, must be something else, like reading of all footers – Gaurav Shah Sep 16 '16 at 11:32
  • @GauravShah, spark 2 disables by default merge of schemas, summary-metadata is other thing, please try to set it in hadoop config. See here too: http://stackoverflow.com/questions/32160926/disable-parquet-metadata-summary-in-spark – Igor Berman Sep 16 '16 at 11:40
  • I did try as you told but still didn't help, on further analysis it looks like that `listLeafFiles` is the culprit. It takes 10 mins to complete just that – Gaurav Shah Sep 16 '16 at 14:06
  • @GauravShah, sorry no idea how to fix it. I've searched a bit and found few pointers how this might be approched, but not yet tried i myself: mapreduce.input.fileinputformat.list-status.num-threads & HADOOP-12810 – Igor Berman Sep 17 '16 at 22:26
  • thanks @igor-berman https://issues.apache.org/jira/browse/HADOOP-12810 definitely seems my problem – Gaurav Shah Sep 18 '16 at 06:38