3

I am using Spark 1.6.2, Scala 2.10.5 and Java 1.7.

Our use case necessitates for us to perform dense_rank() on a data set of over 200 million rows without using the partitionBy clause, only the orderBy clause is used. This currently runs in MSSQL and takes about 30 mins to complete.

I have implemented the equivalent logic in Spark as shown below:

val df1 = hqlContext.read.format("jdbc").options(
  Map("url" -> url, "driver" -> driver,
  "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()

df1.cache()

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))

I am submitting the job in Yarn-cluster mode as shown below. I have a 2 node Hadoop 2.6 cluster, each with 4 vCores and 32 GB memory.

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master  yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar

In the logs, I can see that the table of about 200 mil rows from MSSQL is getting imported & cached in Spark in 15 mins. I see that about 5 GB of memory is getting used until this stage and about 6.2 GB of memory is still free on one of the executors and 11 GB of memory is free on the other executor.

But, the step at dense_rank() is always failing with "GC Overhead limit exceeded" error after a few mins. I have even set the driver memory as high as 7g as you can notice in the spark-submit command above. But, to no avail!. Of course, I understand that the lack of partitionBy clause is actually causing trouble in Spark. But, unfortunately, that is the use case we need to deal with.

Can you please shed some more light here ? Am I missing something ? Is there an alternative to using the dense_rank window function in Spark ? Like, for example using the "zipWithIndex" function suggested by one of the experts elsewhere on this forum ? Will it yield the same results as dense_rank as I understand the "zipWithIndex" method replicates the row_number() function as opposed to dense_rank ?

Any helpful advice is appreciated! Thanks a lot!

Prash
  • 87
  • 3
  • 8

1 Answers1

3

There are two different problems here:

  • You load data over JDBC connection without providing partitioning column or partition predicates. This loads all data using single executor thread.

    This problem is usually pretty easy to solve, either by using one of the existing columns, or by providing artificial key.

  • You use window functions without partitionBy. As a result all data is reshuffled to a single partition, sorted locally, and processed using a single thread.

    In general there is no universal solution that can address that using only Dataset API but there are some tricks you can use:

    • Creating artificial partitions reflecting required record ordering. I described this method in my answer to Avoid performance impact of a single partition mode in Spark window functions

      A similar method could be used in your case but it would require multi-step process, equivalent to the one described below.

    • With associative methods you can use two separate scans over sorted RDD (it should be possible to do similar thing without converting from Dataset as well) and additional action:

      • Compute partial results for each partition (In your case rank for a given partition).
      • Collect required summaries (in your case partition boundaries and accumulated rank value for each partition).
      • Perform second scan to correct for partial aggregates from preceding partitions.

    One example of this approach, which can be easily adjusted to fit your case, can be found in How to compute cumulative sum using Spark

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks a lot for your suggestions! I was able to reduce the data import time from MSSQL using "partitionColumn" options in the JDBC data source. But, the recommendations for dense ranking without partitionBy will need more time for me to digest as I am quite new to Scala. But, thanks so much for guiding me! – Prash Jan 05 '17 at 15:38