We have a weird situation when processing the following steps using Pyspark 1.6.2 via Jupyter notebook on a Hortonworks cluster:
- Reading data from ORC table in a pyspark dataframe
- Pivoting this table by
pivot_column
(pivoted_df
) - Adding some calculated columns on a specific selection of the
pivoted_df
:calced_df = pivoted_df.select(dependency_list).run_calculations()
- Inner Join on the big table
pivoted_df
(columns > 1.600) and the "small" tablecalced_df
(only ~270 columns) to union all the columns - Saving to Parquet table
(In Step 3 the selection is necessary because otherwise it would take a long time to add some calculated fields with withColumn
statement. Select + Join is faster than withColumn
on a table with a lot of columns)
However, for a dataset, where pivot_column
has less than 2.500 variations, the job is working fine. For example, we processed the job successfully starting with 75.430.000 rows and 1.600 columns. When we process the job with another dataset, containing less rows (50.000) and more columns (2.433), it is working, too.
But finally the job crashes in the last step, since pivot_column
has more than 2500 variations (and only around 70.000 rows) with a Stackoverflow error. We debugged the single steps with some show()
actions, to examine where exactly the job is failing. We found out, that everything works fine, up to the Join in Step 4. So the join is causing the problems and since this step we're getting the following message:
Py4JJavaError: An error occurred while calling o4445.showString.
: java.lang.StackOverflowError
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:629)
at scala.collection.AbstractTraversable.to(Traversable.scala:105)
at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:267)
at scala.collection.AbstractTraversable.toSet(Traversable.scala:105)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:280)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)
But what exactly is causing this error and how can we avoid it?
Our current Spark configurations:
.setMaster("yarn-client") \
.set("spark.executor.instances","10") \
.set("spark.executor.cores","4") \
.set("spark.executor.memory","10g") \
.set("spark.driver.memory","8g") \
.set("spark.yarn.executor.memoryOverhead","1200") \
.set("spark.sql.pivotMaxValues", "6000") \
.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
Many thanks in advance!