I am using Spark 1.6.2, Scala 2.10.5 and Java 1.7.
Our use case necessitates for us to perform dense_rank() on a data set of over 200 million rows without using the partitionBy clause, only the orderBy clause is used. This currently runs in MSSQL and takes about 30 mins to complete.
I have implemented the equivalent logic in Spark as shown below:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
I am submitting the job in Yarn-cluster mode as shown below. I have a 2 node Hadoop 2.6 cluster, each with 4 vCores and 32 GB memory.
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
In the logs, I can see that the table of about 200 mil rows from MSSQL is getting imported & cached in Spark in 15 mins. I see that about 5 GB of memory is getting used until this stage and about 6.2 GB of memory is still free on one of the executors and 11 GB of memory is free on the other executor.
But, the step at dense_rank() is always failing with "GC Overhead limit exceeded" error after a few mins. I have even set the driver memory as high as 7g as you can notice in the spark-submit command above. But, to no avail!. Of course, I understand that the lack of partitionBy clause is actually causing trouble in Spark. But, unfortunately, that is the use case we need to deal with.
Can you please shed some more light here ? Am I missing something ? Is there an alternative to using the dense_rank window function in Spark ? Like, for example using the "zipWithIndex" function suggested by one of the experts elsewhere on this forum ? Will it yield the same results as dense_rank as I understand the "zipWithIndex" method replicates the row_number() function as opposed to dense_rank ?
Any helpful advice is appreciated! Thanks a lot!