0

I am using sparksql to transform 12 GB data.My Transformation is to apply row number function with partition by on one of fields then divide data into two sets first set where row number is 1 and 2nd set include rest of data then write data to target location in 30 partitions.

My job is currently taking approximately 1 hour.I want to run it in less than 10 mins.

I am running this job on 3 Node cluster with specs(16 Cores & 32 GB RAM). Node 1 yarn master node. Node 2 Two Executors 1 driver and 1 other Node 3 Two executors both for processing. Each executor is assigned 5 cores and 10GB memory.

Is my hardware enough or i need more powerful hardware? Is executors configuration right? If both hardware and configuration is good then definitely i need to improve my code.

My code is as follow.

sqlContext=SQLContext(sc)

SalesDf = sqlContext.read.options(header='true').load(path, format='csv')
SalesDf.cache()

SalesDf_Version=SalesDf.withColumn('row_number',F.row_number().over(Window.partitionBy("id").orderBy(desc("recorddate"))))

initialLoad = SalesDf_Version.withColumn('partition',SalesDf_Version.year).withColumn('isActive', when(col('row_number') == 1, lit('Y')).when(col('row_number') != 1, lit('N')))
initialLoad = initialLoad.withColumn('version_flag',col ('isActive')).withColumn('partition',col('city'))
initialLoad = initialLoad.drop('row_number')


initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')
initialLoad.coalesce(1).write.partitionBy('isActive','partition').option("header", "true").mode('overwrite').csv(path +'Temp/target/')

sc.stop()

Thanks in advance for your help

desertnaut
  • 57,590
  • 26
  • 140
  • 166
Ammar
  • 45
  • 1
  • 7

2 Answers2

4

You have a coalesce(1) before writing, what is the reason for that? Coalesce reduces the parallelization of that stage which in your case will cause the windowing query to run on 1 core, so you're losing the benefit of the 16 cores per node.

Remove the coalesce and that should start improving things.

Silvio
  • 3,947
  • 21
  • 22
  • When i was testing on small dataset removing coalesce(1) was degrading performance but that was on very small cluster with two cores.Should i remove it or should i increase it to coalesce(12) or something – Ammar Dec 10 '17 at 07:33
  • Rather than coalesce you can tweak `spark.sql.shuffle.partitions` which defaults to 200. On very small datasets, yes that value is probably too high. For your actual dataset, please try without `coalesce` and leaving the default `spark.sql.shuffle.partitions`. Then let us know how it ran. – Silvio Dec 10 '17 at 15:10
  • after removing coalesce it took 1.5 hours – Ammar Dec 11 '17 at 11:24
  • what's the cardinality of the `id` column? ```SalesDf.groupBy("id").count().show()``` Also, you have 2 save operations in your example code, is that intentional or just a copy-paste error? – Silvio Dec 11 '17 at 13:04
  • I tried coalesce(12) at write it got executed in 15 mins .regarding your question i am writing it two time its not copy-paste error. – Ammar Dec 11 '17 at 16:21
1

Following were the changes that we implemented to improve performance of our code.

We removed coalesce and used repartition(50).We tried higher and lower numbers in the brackets but 50 was the optimized number in our case. We were using s3 as our target but it was costing us alot because of rename thing in spark so we used HDFS instead and our job time was reduced to half of what it was before. Overall by above changes our code ran 12 mins previously it was 50 mins. Thanks Ammar

Ammar
  • 45
  • 1
  • 7
  • Rather than the `repartition`, can you try setting `spark.sql.shuffle.partitions` to 50 and see how that runs? This would reduce your shuffles to only 1 for the window. Also, if you're writing to s3 you should set the file output committer version to 2, see here https://docs.databricks.com/spark/latest/faq/append-slow-with-spark-2.0.0.html – Silvio Dec 16 '17 at 14:21
  • Sure i will try this change and will share how it ran – Ammar Dec 17 '17 at 17:43